RabbitMQ中文文档PHP版本(五)--主题

2019年12月10日10:05:11

原文:https://www.rabbitmq.com/tutorials/tutorial-five-php.html

话题

(使用php-amqplib

先决条件

本教程假定RabbitMQ 在标准端口(5672)的本地主机安装并运行如果您使用其他主机,端口或凭据,则连接设置需要进行调整。

在哪里获得帮助

如果您在阅读本教程时遇到困难,可以 通过邮件列表与我们联系。

在上一教程中,我们改进了日志记录系统。我们没有使用只能进行虚拟广播扇出交换机,而是使用直接交换机,并有可能选择性地接收日志。

尽管使用直接交换改进了我们的系统,但它仍然存在局限性-它不能基于多个条件进行路由。

在我们的日志记录系统中,我们可能不仅要根据严重性订阅日志,还要根据发出日志的源订阅日志。您可能从syslog unix工具中了解了这个概念,该 工具根据严重性(info / warn / crit ...)和工具(auth / cron / kern ...)路由日志。

这将为我们提供很大的灵活性-我们可能只想听来自“ cron”的严重错误,也可以听“ kern”的所有日志。

为了在日志系统中实现这一点,我们需要学习更复杂的主题交换。

话题交流

发送到主题交换的消息不能具有任意的 routing_key-它必须是单词列表,以点分隔。这些词可以是任何东西,但是通常它们指定与消息相关的某些功能。一些有效的路由关键示例:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。路由密钥中可以包含任意多个单词,最多255个字节。

绑定密钥也必须采用相同的形式。主题交换背后的逻辑类似于直接交换的逻辑 -使用特定路由密钥发送的消息将被传递到所有使用匹配绑定密钥绑定的队列。但是,绑定键有两个重要的特殊情况:

  • *(星号)可以代替一个单词。
  • (哈希)可以替代零个或多个单词。

在一个示例中最容易解释这一点:

RabbitMQ中文文档PHP版本(五)--主题

在此示例中,我们将发送所有描述动物的消息。将使用包含三个词(两个点)的路由密钥发送消息。路由键中的第一个单词将描述速度,第二个将描述颜色,第三个将描述物种:“ <speed>。<color>。<species> ”。

我们创建了三个绑定:Q1与绑定键“ * .orange。* ” 绑定,Q2与“ *。*。rabbit ”和“ lazy。# ” 绑定

这些绑定可以总结为:

  • Q1对所有橙色动物都感兴趣。
  • 第2季想听听有关兔子的一切,以及有关懒惰动物的一切。

路由键设置为“ quick.orange.rabbit ”的消息将传递到两个队列。消息“ lazy.orange.elephant ”也将发送给他们两个。另一方面,“ quick.orange.fox ”只会进入第一个队列,而“ lazy.brown.fox ”只会进入第二个队列“ lazy.pink.rabbit ”将被传递到第二队只有一次,即使两个绑定匹配。“ quick.brown.fox ”与任何绑定都不匹配,因此将被丢弃。

如果我们违反合同并发送一个或四个单词的消息,例如“ 橙色 ”或“ quick.orange.male.rabbit ”,会发生什么?好吧,这些消息将不匹配任何绑定,并且将会丢失。

另一方面,“ lazy.orange.male.rabbit ”即使有四个单词,也将匹配最后一个绑定,并将其传送到第二个队列。

话题交流

主题交流功能强大,可以像其他交流一样进行。

当队列用“  ”(哈希)绑定键绑定时,它将接收所有消息,而与路由键无关,就像在扇出交换中一样。

在绑定中不使用特殊字符“ * ”(星号)和“  ”(哈希)时,主题交换的行为就像直接的一样

放在一起

我们将在日志记录系统中使用主题交换。我们将从一个可行的假设开始,即日志的路由键将包含两个词:“ <facility>。<severity> ”。

该代码与上一教程中的代码几乎相同 

的代码emit_log_topic.php

<?php

require_once __DIR__ . ‘/vendor/autoload.php‘;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection(‘localhost‘, 5672, ‘guest‘, ‘guest‘);
$channel = $connection->channel();

$channel->exchange_declare(‘topic_logs‘, ‘topic‘, false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : ‘anonymous.info‘;
$data = implode(‘ ‘, array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, ‘topic_logs‘, $routing_key);

echo ‘ [x] Sent ‘, $routing_key, ‘:‘, $data, "\n";

$channel->close();
$connection->close();

receive_logs_topic.php的代码

require_once __DIR__ . ‘/vendor/autoload.php‘;
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection(‘localhost‘, 5672, ‘guest‘, ‘guest‘);
$channel = $connection->channel();

$channel->exchange_declare(‘topic_logs‘, ‘topic‘, false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
    file_put_contents(‘php://stderr‘, "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, ‘topic_logs‘, $binding_key);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ‘ [x] ‘, $msg->delivery_info[‘routing_key‘], ‘:‘, $msg->body, "\n";
};

$channel->basic_consume($queue_name, ‘‘, false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

接收所有日志:

php receive_logs_topic.php "#"

要从设施“ kern ” 接收所有日志

php receive_logs_topic.php "kern.*"

或者,如果您只想听听“ 关键 ”日志:

php receive_logs_topic.php "*.critical"

您可以创建多个绑定:

php receive_logs_topic.php "kern.*" "*.critical"

并发出带有路由键“ kern.critical ” 的日志类型:

php emit_log_topic.php "kern.critical" "A critical kernel error"

玩这些程序玩得开心。请注意,该代码未对路由键或绑定键进行任何假设,您可能要使用两个以上的路由键参数。

embed_log_topic.php 和receive_logs_topic.php的完整源代码

接下来,在教程6中找出如何做往返消息作为远程过程调用