RabbitMQ路由模式(Routing)

一、Routing模式简介

RabbitMQ的路由模式,可以简单理解为,根据exchange绑定的key,将消息路由到不同的queue,模型图如下:
RabbitMQ路由模式(Routing)
上图中几个关键点:
Publisher:消息的生产者
Exchange:路由,类型为direct
Queue:队列,存储消息的地方
Consumer:消费者

二、生产者发送消息

生产者不直接直接发送消息到队列的,只发送到相应的exchange上,代码如下:

static string EXCHANGE_NAME = "topic_exchange_direct";
        static void TopicPublisher()
        {
            var conn = RabbitMQHelper.GetConnection();
            var channel = conn.CreateModel();
            channel.ExchangeDeclare(exchange: EXCHANGE_NAME,type:ExchangeType.Direct);         
            string routingKey = "error";
            var msg = $"hello topic {routingKey}!";
            var body = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish(exchange: EXCHANGE_NAME, routingKey: routingKey, basicProperties: null, body: body);
            Console.WriteLine($"消息:{msg} ,发送完成!");
            channel.Close();
            conn.Close();          
        }

发送成功,图如下:
RabbitMQ路由模式(Routing)
我们去rabbitmq的管理页面,可以看到消息已经发送成功,但是还没有消费者绑定到这个exchange上:
RabbitMQ路由模式(Routing)

三、消费者

要消费topic类型的消息,消费者需要绑定自己需要的路由键(rountingKey),一个消费者可以绑定多个key

3.1 路由键为error的消费者1,代码如下:

static string EXCHANGE_NAME = "topic_exchange_direct";
        static string QUEUE_NAME = "topic_queue_direct1";
        static void TopicConsumer1()
        {
            var conn = RabbitMQHelper.GetConnection();
            var channel = conn.CreateModel();
            channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Direct);
            channel.QueueDeclare(queue: QUEUE_NAME);
            // 绑定routingKey
            string routingKey = "error";
            channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKey);
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                Console.WriteLine($"Topic Consumer1 收到消息: {message},时间:{DateTime.Now}");
            };
            channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
            Console.ReadKey();
            channel.Close();        
            conn.Close();        
        }

3.2 路由键为error、info、warning的消费者2,代码如下:

static string EXCHANGE_NAME = "topic_exchange_direct";
        static string QUEUE_NAME = "topic_queue_direct2";
        static void TopicConsumer2()
        {
            var conn = RabbitMQHelper.GetConnection();
            var channel = conn.CreateModel();
            channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Direct);
            channel.QueueDeclare(queue: QUEUE_NAME);
            string routingKeyInfo = "info";
            string routingKeyError = "error";
            string routingKeyWarning = "warning";
            // 可以绑定多个routingKey
            channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKeyInfo);
            channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKeyError);
            channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKeyWarning);
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                Console.WriteLine($"Topic Consumer2 收到消息: {message},时间:{DateTime.Now}");
            };
            channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
            Console.ReadKey();
            channel.Close();
            conn.Close();
        }

3.4 测试发送不同topic的消息

(1) 启动两个消费者后,我们可以看到一共有四个key绑定到交换机上;
消费者1的队列(topic_queue_direct2)绑定了一个key:error,消费者2的队列(topic_queue_direct2)绑定了三个key:error、info、warning
RabbitMQ路由模式(Routing)
(2)发送一个rountingKey为error的消息,再发送一个rountingKey为info的消息,我们查看客户端接收消息的情况:
消费者1只接收到了一条消息,图如下:
RabbitMQ路由模式(Routing)

消费者2两条消息都接收到了,图如下:
RabbitMQ路由模式(Routing)
至此,我们的Topic模式已经验证完毕。

四、小结

1、Routing模式需要绑定到交换机,类型为direct
2、routingKey为分发消息的依据,生产者发送消息时要指定routingKey;消费者可以绑定多个routingKey;
参考资料:https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html

相关推荐