rabbitmq学习4:Routing

  在《rabbitmq学习3:Publish/Subscribe 》中已经学习了发送一个消息,所有消费者端都能收到。那现在这节准备介绍通过路由规则来接受生产者端所发送的消费。Routing的工作示意图如下:


rabbitmq学习4:Routing

对于Routing的示意图与Publish/Subscribe中的示意图区别:

第一:Publish/Subscribe的Exchange的类型为“fanout”,而Routing的类型为“direct”

第二:Publish/Subscribe的路由为默认的,而Routing的路由是自定义的。

可能从上图的示意图如可以发现可以把Routing的模式也可以转化Publish/Subscribe的模式,如示意图


rabbitmq学习4:Routing

我们也可能把所有的数据发送到一个Queue中去,示意图如下:


rabbitmq学习4:Routing

下面我们就开始程序吧。

P端的程序如下:

package com.abin.rabbitmq;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class EmitLogDirect {  
    private static final String EXCHANGE_NAME = "direct_logs";  
  
    public static void main(String[] argv) throws Exception {  
  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
  
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");//rounting模式  
  
        String routingKeyOne = "error";//定义一个路由名为“error”  
        for (int i = 0; i <= 1; i++) {  
            String messageOne = "this is a error logs:" + i;  
            channel.basicPublish(EXCHANGE_NAME, routingKeyOne, null, messageOne  
                    .getBytes());  
            System.out.println(" [x] Sent '" + routingKeyOne + "':'" + messageOne  
                    + "'");  
        }  
  
        System.out.println("################################");  
        String routingKeyTwo = "info";  
        for (int i = 0; i <= 2; i++) {  
            String messageTwo = "this is a info logs:" + i;  
            channel.basicPublish(EXCHANGE_NAME, routingKeyTwo, null, messageTwo  
                    .getBytes());  
            System.out.println(" [x] Sent '" + routingKeyTwo + "':'" + messageTwo  
                    + "'");  
        }  
  
        System.out.println("################################");  
        String routingKeyThree = "all";  
        for (int i = 0; i <= 3; i++) {  
            String messageThree = "this is a all logs:" + i;  
            channel.basicPublish(EXCHANGE_NAME, routingKeyThree, null,  
                    messageThree.getBytes());  
            System.out.println(" [x] Sent '" + routingKeyThree + "':'"  
                    + messageThree + "'");  
        }  
  
        channel.close();  
        connection.close();  
    }  
}  

运行结果可能如下:

 [x] Sent 'error':'this is a error logs:0'  
 [x] Sent 'error':'this is a error logs:1'  
################################  
 [x] Sent 'info':'this is a info logs:0'  
 [x] Sent 'info':'this is a info logs:1'  
 [x] Sent 'info':'this is a info logs:2'  
################################  
 [x] Sent 'all':'this is a all logs:0'  
 [x] Sent 'all':'this is a all logs:1'  
 [x] Sent 'all':'this is a all logs:2'  
 [x] Sent 'all':'this is a all logs:3'  

C端的代码如下:

package com.abin.rabbitmq;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.QueueingConsumer;  
  
public class ReceiveLogsDirect {  
    private static final String EXCHANGE_NAME = "direct_logs";//定义Exchange名称  
  
    public static void main(String[] argv) throws Exception {  
  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
  
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");//声明Exchange  
  
        String queueName = "queue_logs1";//定义队列名为“queue_logs1”的Queue  
        channel.queueDeclare(queueName, false, false, false, null);  
        String routingKeyOne = "error";//"error"路由规则  
        channel.queueBind(queueName, EXCHANGE_NAME, routingKeyOne);//把Queue、Exchange及路由绑定  
        String routingKeyTwo = "info";  
        channel.queueBind(queueName, EXCHANGE_NAME, routingKeyTwo);  
  
        System.out.println(" [*] Waiting for messages.");  
  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        channel.basicConsume(queueName, true, consumer);  
  
        while (true) {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            String routingKey = delivery.getEnvelope().getRoutingKey();  
  
            System.out.println(" [x] Received '" + routingKey + "':'" + message  
                    + "'");  
        }  
    }  
}  

 这里我做了二个消费端程序来模仿通过路由规则来分配信息给各个消费端。第二个消费者端的程序只是修改了一小部分代码;只接受路由为”error“和”all“规则的消费。

运行程序1的结果如下:

[*] Waiting for messages.  
 [x] Received 'error':'this is a error logs:0'  
 [x] Received 'error':'this is a error logs:1'  
 [x] Received 'info':'this is a info logs:0'  
 [x] Received 'info':'this is a info logs:1'  
 [x] Received 'info':'this is a info logs:2'  

 运行程序2的运行结果如下:

[*] Waiting for messages.  
[x] Received 'error':'this is a error logs:0'  
[x] Received 'error':'this is a error logs:1'  
[x] Received 'all':'this is a all logs:0'  
[x] Received 'all':'this is a all logs:1'  
[x] Received 'all':'this is a all logs:2'  
[x] Received 'all':'this is a all logs:3'  
  • rabbitmq学习4:Routing
  • 大小: 11.7 KB
  • rabbitmq学习4:Routing
  • 大小: 9.6 KB
  • rabbitmq学习4:Routing
  • 大小: 10.7 KB