第十七天-关于消息传递

Day 17: Something about messaging (but I couldn’t think of a snappier title.)

Day 17: Something about messaging (but I couldn’t think of a snappier title.)

为什么要传递消息

当我第一次开始考虑写今年的 Advent 文章时,我反思我在过去的十二个月里并没有真正写过大量的 Raku,与往年相比,我似乎写了大量的模块。我一直在做的事情(至少在我的日常工作中)正在考虑和实施大量使用某些消息传递系统的应用程序。所以我认为将这些想法引入 Raku 会很有趣。

作为一种“胶水语言”,Perl一直享有盛誉,Raku 具有与之竞争的功能,最显着的是响应式和并发功能,因此非常适合创建基于消息的集成服务。

传递什么信息

现在我的脚下就是优秀的企业集成模式,尽管它现在已经有近15年的历史了,但我仍然建议任何有兴趣(或工作于)该领域的人。然而,它是一个重量级的书(字面上,它在硬书中的重量接近一点五公斤),所以我用它来提醒自己不要试图在这个主题上详尽无遗,以免这会变成一本书本身。

有相当多的自由和商业管理消息系统,使用一系列开放和专有的协议,但我将限制自己到我熟悉的 RabbitMQ,并且在 Raku 中由 Net::AMQP 支持。

如果你想亲自尝试一下这些例子,你将需要访问一个 RabbitMQ 代理(它可以作为大多数操作系统发行版的包),但是你可以使用 Docker Image,它看起来工作得很好。

您还需要安装 Net::AMQP,这可以通过以下方式完成:

zef install Net::AMQP

在示例中,我将使用 RabbitMQ 服务器的默认连接详细信息(即代理正在本地主机上运行,​​并且默认 guest 处于活动状态),如果您需要提供不同的详细信息,则可以更改 Net::AMQP 的构造函数以反映适当的值:

my $n = Net::AMQP.new(
  host => 'localhost',
  port => 5672,
  login => 'guest',
  password => 'guest',
  vhost => '/'
);

一些示例可能需要其他模块,但我会在介绍时介绍它们。

强制性的你好,世界

RabbitMQ实现了由AMQP v0.9规范描述的丰富的代理体系结构,由ActiveMQ实现的最新的v1.0规范取消了大部分规定的代理语义,以至于它基本上是一种不同的协议,它共享一个类似的电线格式。

发送消息(生产者)的最简单可能的例子可能是:

use Net::AMQP;

my $n = Net::AMQP.new;

await $n.connect;
my $channel = $n.open-channel(1).result;
my $exchange = $channel.exchange.result;
$exchange.publish(routing-key => "hello", body => "Hello, World".encode);
await $n.close("", "");

这演示了RabbitMQ和Net :: AMQP的大部分核心功能。

首先你会注意到许多方法返回一个Promise,它将大部分保留在实际的返回值中,这反映了代理的异步性质,它发送(大多数情况但不是全部)确认消息(AMQP说法中的方法,)当操作在服务器上完成时。

这里的连接建立到代理的网络连接并且协商某些参数,如果网络连接失败,提供的凭证不正确或者服务器拒绝某个其他连接,则返回一个Promise,如果成功或失败,它将保留一个真值原因。

开放通道打开一个逻辑代理通信通道,在这个通道中交换消息,您可以在应用程序中使用多个通道。当服务器确认后,返回的Promise将保留在初始化的Net :: AMQP :: Channel对象中。

通道对象上的交换方法返回一个Net :: AMQP :: Exchange对象,在AMQP模型中,所有消息都发布到交换机上,根据交换机的定义,代理可以将消息路由到一个或多个队列由此消息可能被另一客户消耗。在这个简单的例子中,我们将使用默认交换(名为amq.default。)

发布方法是在交换对象上调用的,它没有返回值,因为它只是简单的触发和遗忘,代理不会确认收到和交付,否则队列与发布消息的行为是分离的。顾名思义,路由密钥参数是由代理用来确定将消息路由到哪个队列(或多个队列)。在这个例子中使用默认交换的情况下,交换的类型是直接的,这基本上意味着消息传递到具有与路由密钥匹配的名称的队列中的一个消费者。正文总是一个Buf,并且可以是任意长度,在这种情况下,我们使用的是编码字符串,但它可以同样编码为JSON,MessagePack或BSON blob,无论适合消费应用程序。事实上可以提供内容类型和内容编码参数,如果应用程序的设计需要它,消息将传递给消费者,但代理本身完全不知道有效内容的内容。还有其他可选参数,但在这个例子中不需要。

当然,我们也需要阅读我们发布的消息(消费者):

use Net::AMQP;

my $n = Net::AMQP.new;

my $connection = $n.connect.result;

react {
    whenever $n.open-channel(1) -> $channel {
        whenever $channel.declare-queue("hello") -> $queue {
            $queue.consume;
            whenever $queue.message-supply.map( -> $v { $v.body.decode }) -> $message {
                say $message;
                $n.close("", "");
                done();
            }
        }
    }
}

在这里,我们使用的是一个命名队列,而不是像我们在制作人那样在交易所进行操作;如果队列尚不存在,declare-queue将导致队列被创建,并且代理默认将该队列绑定到默认交换,“绑定”实质上意味着发送到交换的消息可以被路由到队列取决于交换类型,消息的路由键以及可能来自消息的其他元数据。在这种情况下,默认交换的“直接”类型将导致消息被路由到与路由密钥相匹配的队列(如果存在的话,如果消息不存在,消息将被无声地丢弃)。

当您准备好开始接收消息时调用消费方法,它将返回一个Promise,该Promise将与“消费者标签”一起保存,该标签将消费者唯一标识给服务器,但由于我们不需要它,因此我们可以忽略它。

一旦我们调用了消费(并且代理发送了确认),那么路由到我们队列的消息将作为Net :: AMQP :: Queue :: Message对象发送到由消息供应返回的Supply,但是因为我们对这个例子中的消息元数据不感兴趣映射被用来创建具有消息的解码体的新的Supply;这是安全的,因为在这种情况下,您可以保证您将接收utf-8编码,但是在真实世界的应用程序中,如果您不控制发送者,您可能希望在处理身体方面更强壮一些(当与第三方应用程序集成时通常是这种情况)。发布消息时提供的内容类型和内容编码在Message对象的headers属性(一个Hash)中可用,但它们不是必需的因此您可能需要考虑适合您的应用的替代方案。

在这个例子中,连接被关闭,并且在接收到第一条消息之后退出响应,但实际上您可能需要删除这些行:

$n.close("", "");
done();

从内到外,如果你想退出一个信号例如添加:

whenever signal(SIGINT) {
    $n.close("", "");
    done();
}

在反应区的最高层。但是,如果您选择退出程序,则应始终在连接对象上调用close,因为这会在代理日志中引发警告消息,如果不这样做,可能会使管理服务器的人感到不安。

我们当然可以用类似的方式在生产者示例中使用反应语法,但是它会增加冗长的好处,但是在一个更大的程序中,例如,您可能正在处理一个Supply,它可以很好地工作很好:

use Net::AMQP;
  
my $supply = Supply.from-list("Hello, World", "Bonjour le monde", "Hola Mundo");
my $n = Net::AMQP.new;

react {
    whenever $n.connect {
        whenever $n.open-channel(1) -> $channel {
            whenever $channel.exchange -> $exchange {
                whenever $supply.map(-> $v { $v.encode }) -> $body {
                    $exchange.publish(routing-key => "hello", :$body );
                    LAST {
                        $n.close("", "");
                        done();
                    }
                }
            }
        }
    }
}

一些更有用的东西

你可能会认为“这一切都很好,但这不是我不能做的事情,比如说,一个HTTP客户端和一个小型Web服务器”,好吧,你得到可靠的排队,未读消息的持久性等等,但是,对于简单的应用程序来说,它可能会被过度杀死,直到您添加了将消息发送给多个可能未知的消费者的需求为止。这种模式是使用“扇出”交换类型,它将向绑定到交换的所有队列传递消息。

在这个例子中,我们需要声明自己的队列,以便我们可以指定类型,但是生产者不会变得更加复杂:

use Net::AMQP;

my $n = Net::AMQP.new;
my $con =  await $n.connect;
my $channel = $n.open-channel(1).result;
my $exchange = $channel.declare-exchange('logs', 'fanout').result;
$exchange.publish(body => 'Hello, World'.encode);
await $n.close("", "");

这里唯一的区别是我们使用声明交换而不是在通道上交换来获得我们发送消息的交换,这样做的好处是使交换在指定类型的代理上创建已经存在,这在这里很有用,因为我们不需要依赖事先创建的交换(使用命令行工具rabbitmqctl或通过web管理界面),但它同样返回一个Promise,它将与Exchange交换目的。您可能还注意到,这里的路由密钥没有被传递给发布方法,这是因为对于扇出交换,路由密钥被忽略,并且消息被传递到绑定到交换机的所有消耗队列。

消费者代码与我们的原始消费者同样不存在差异:

use Net::AMQP;

my $n = Net::AMQP.new;

my $connection = $n.connect.result;

react {
    whenever $n.open-channel(1) -> $channel {
        whenever $channel.declare-exchange('logs', 'fanout') -> $exchange {
            whenever $channel.declare-queue() -> $queue {
                whenever $queue.bind('logs') {
                    $queue.consume;
                    whenever $queue.message-supply.map( -> $v { $v.body.decode }) -> $message {
                        say $*PID ~ " : " ~ $message;
                    }
                }
                whenever signal(SIGINT) {
                    say $*PID ~ " exiting";
                    $n.close("", "");
                    done();
                }

            }
        }
    }
}

交换的声明与生产者示例中声明的方式相同,这非常方便,因此您不必担心启动程序的顺序,第一次运行将创建队列,但是如果您在消费者启动之前运行生产者,发送的消息将被丢弃,因为默认情况下没有路由它们。这里我们还声明了一个没有提供名称的队列,这会创建一个“匿名”队列(该名称由代理组成),因为队列的名称在此路由消息中不起作用案件。

您可以提供一个队列名称,但如果名称重复,那么这些消息将以“先到先得”的方式路由到具有相同名称的队列,这可能不是预期的行为(尽管可能并可能有用。)

同样在这种情况下,队列必须明确地绑定到我们已经声明的交易所,在第一个例子中,默认交易所的绑定是由代理自动执行的,但在大多数情况下,您将不得不在队列上使用绑定交易所的名称。与许多方法一样,绑定返回一个Promise,当代理确认操作已完成时将保留Promise(尽管在这种情况下,值不重要)。

您应该能够根据需要启动尽可能多的消费者,并且他们都将按照发送的顺序接收所有消息。当然,在真实世界的应用程序中,消费者可能是用各种不同语言编写的完全不同的程序。

保持主题

一种常见模式是一组消费者,他们只对发布到特定交易所的某些消息感兴趣,其典型例子可能是记录系统,其中有专门针对不同日志级别的消费者。 AMQP提供了一种话题交换类型,允许通过生产者提供的路由密钥上的模式匹配将消息路由到特定的队列。

最简单的生产者可能是:

use Net::AMQP;

multi sub MAIN(Str $message = 'Hello, World', Str $level = 'application.info') {
	my $n = Net::AMQP.new;
	my $con =  await $n.connect;
	my $channel = $n.open-channel(1).result;
	my $exchange = $channel.declare-exchange('topic-logs', 'topic').result;
	$exchange.publish(routing-key => $level, body => $message.encode);
	await $n.close("", "");
}

这应该从前面的例子中相当清楚,除了在这种情况下,我们将交换声明为主题类型,并且还提供将由代理用于匹配消费队列的路由密钥。

消费者代码本身又与前面的例子非常相似,只不过它会在命令行上列出一些用于匹配发送到交换机的路由密钥的模式:

use Net::AMQP;

multi sub MAIN(*@topics ) {
    my $n = Net::AMQP.new(:debug);
    unless @topics.elems {
        say "will be displaying all the messages";
        @topics.push: '#';
    }
    my $connection = $n.connect.result;
    react {
        whenever $n.open-channel(1) -> $channel {
            whenever $channel.declare-exchange('topic-logs', 'topic') -> $exchange {
                whenever $channel.declare-queue() -> $queue {
                    for @topics -> $topic {
                        await $queue.bind('topic-logs', $topic);
                    }
                    $queue.consume;
                    my $body-supply = $queue.message-supply.map( -> $v { [ $v.routing-key, $v.body.decode ] }).share;
                    whenever $body-supply -> ( $topic , $message ) {
                            say $*PID ~ " : [$topic]  $message";
                    }
                }
            }
        }
    }
}

这里基本上与前面的消费者示例的唯一区别是(除了提供给交换声明的类型)该主题提供给绑定方法。该主题可以是一个简单模式,其中#将匹配任何提供的路由密钥,并且行为将与扇出交换相同,否则可以在绑定主题的任何部分用作通配符,以匹配任何字符在这个例子中,在这个例子中,应用程序将匹配使用路由关键字application.info或application.debug发送的消息。

如果有多于一个队列使用相同的模式绑定,则它们的行为也会像绑定到扇出交换机一样。如果绑定模式既不包含哈希也不包含星号字符,那么队列的行为就好像它被绑定到一个直接交换的那个名称的队列一样(也就是说它将有先到先服务基础。)

但是,生命比AMQP更重要

当然。 Raku反应模型的优点在于可以将上面提到的供应商提供的各种源集成到您的生产者代码中,并且类似地,消费者可以将消息推送到另一个传输机制。

我很高兴地发现,当我想到这个例子的时候,下面的工作是正常的:

use EventSource::Server;
use Net::AMQP;
use Cro::HTTP::Router;
use Cro::HTTP::Server;

my $supply = supply { 
	my $n = Net::AMQP.new;
	my $connection = $n.connect.result;
	whenever $n.open-channel(1) -> $channel {
		whenever $channel.declare-queue("hello") -> $queue {
			$queue.consume;
			whenever $queue.message-supply.map( -> $v { $v.body.decode }) -> $data {
				emit EventSource::Server::Event.new(type => 'hello', :$data);
			}
		}
	}
};

my $es = EventSource::Server.new(:$supply);

my $application = route {
	get -> 'greet', $name {
		content 'text/event-stream; charset=utf-8', $es.out-supply;
	}
}
my Cro::Service $hello = Cro::HTTP::Server.new:
	:host, :port, :$application;
$hello.start;

react whenever signal(SIGINT) { $hello.stop; exit; }

这是EventSource :: Server中的示例的变体,您当然可以修改它以使用上面讨论的任何交换类型。它应该适用于第一个例子中的生产者代码。而且(如果你是这么说服的话),你可以用一小段node.js代码(或者在一些面向浏览器的javascript中)来消费事件:

	var EventSource = require('eventsource');

	var event = process.argv[2] || 'message';

	console.info(event);
	var v = new EventSource(' http://127.0.0.1:10000');

	v.addEventListener(event, function(e) {
		console.info(e);

	}, false);

把它包起来

在输入第一段之后,我总结道,在一篇短文中,我永远无法做到这个主题正义,所以我希望你认为这是一个开胃菜,我不认为我会永远找到时间来写书,它可能值得。但是我确实有基于 RabbitMQ 教程的所有示例,因此请检查并随意贡献。

Raku 

comments powered by Disqus