【译】RabbitMQ系列(五) - 主题模式

主题模式

在上一章我们改进了我们的日志系统,如果使用fanout我们只能简单进行广播,而使用direct则允许消费者可以进行一定程度的选择。但是direct还是有其局限性,其路由不支持多个条件。

在我们的日志系统中,消费者程序可能不止是基于日志的severity,同时也想基于发送日志的源系统。你可能知道linux的syslog工具,它就是同时基于severity(info/warn/crit...)和功能(auth/cron/kern...).

这就提供了很大的灵活性-我们想接收来自cron的严重错误日志和kern的所有日志。

下面我们就使用更复杂的topic来改进我们的日志系统。

Topic exchange

发送到topic类型exchange的message不可以具有模糊的routing_key,它必须具有以冒号分割的词。就像"stock.usd.nyse","nyse.vmw","quick.orange.rabbit"等,限制长度255字节。

binding key也采用相似的形势。topic exchange的逻辑和direct相似,通过比较message的routing key和bind的binding key,来匹配转发的queue。但是topic的binding支持通配符:

  • ” * “表示任何一个词
  • ” # “ 表示0或1个词

【译】RabbitMQ系列(五) - 主题模式
通过上面图示的场景来解释会比较好理解。

例子中我们将发送描述动物的message。message会携带routing key(包含三个词),第一个词表示speed,第二个表示color,第三个表示species"<speed>.<colour>.<species>".

创建了三个绑定:Q1的binding key是”*.orange.*" Q2的binding key是“*.*.rabbit”和 "lazy.#".

以文字表述便是:

  • Q1 关心所有橘色的动物
  • Q2 关心所有的rabbit和所有的lazy动物

routing key为“quick.orange.rabbit"的message会同时发布到这两个queue。
routing key为"lazy.orange.elephant"的message会同时发布到这两个queue。
routing key为”quick.orange.fox“只会发布到第一个queue.
routing key为”lazy.brown.fox"的message只会发布到第二个queue.
routing key为"lazy.pink.rabbit"的message虽然满足Q2的两个条件,但也只会发布到Q2一次。
routing key为"quick.brown.fox"的message没有任何匹配,就会被丢失。

如果我们发送的message只有一个word或者多余三个word,如"orange"或者"quick.orange.male.rabbit"会发生什么呢?这些message不会匹配任何binding key,均会被丢弃掉。

另外"lazy.orange.male.rabbit"虽然具有四个词,但是会匹配最后的binding key,而被发送到第二个queue。

Topic exhange非常强大,同时可以模仿其他两种类型的exchange。当binding key为 # 时,queue会接收所有的message。当binding key中没有使用通配符(* 和 #)时,topic的行为和direct一致。

开始执行

我们将在日志系统中使用topic exchange。我们的routding key采用两个词 "<facility>.<severity>".
EmitLogTopic.java的代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
    }
  }
  //..
}

ReceiveLogsTopic.java的代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
        System.exit(1);
    }

    for (String bindingKey : argv) {
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    }

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

编译

javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java

接收所有日志

java -cp $CP ReceiveLogsTopic "#"

接收功能"kern"的日志

java -cp $CP ReceiveLogsTopic "kern.*"

接收严重级别日志

java -cp $CP ReceiveLogsTopic "*.critical"

接收者使用两个绑定条件

java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

发送日志message

java -cp $CP EmitLogTopic "kern.critical" "A critical kernal error"

相关推荐