白话RabbitMQ(三):发布/订阅
推广
RabbitMQ专题讲座
CoolMQ开源项目
我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https://github.com/vvsuperman…,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面
前言
在第二章中我们描述了任务队列,在任务队列中一个消息只会发送给一个消费者。而在这一章中我们将消息发送给许多个消费者,我们称之为“发布/订阅”
为了更好的阐述这个模式,我们会建立一个新的简单的logging系统,包含2个步骤-第一步发送log信息,第二步能够接受并将信息打印出来,而且在第二步中所有的消费者都会接受到同样的消息,比如一个消费者用来将log信息写到磁盘,另外一个接受信息并显示在屏幕上。因此一旦有有消息,消息会广播到所有的消费者。
交换机(Exchanges)
前面的章节中我们是直接通过queue来处理消息,现在我们来介绍一种更完善的模式
让我们迅速浏览一遍前面的主题:
- 生产者是一个客户端程序,用来发送消息
- 队列是一个缓冲,用来存储消息
- 消费者是一个客户端程序,用来接受消息
RabbitMQ的核心思想是生产者不会将消息直接发送给队列,意味着生产者是完全看不到队列的。反之,生产者只能将消息发送给路由器(Exchange),再由路由器来决定该如何来处理消息,是将消息发送给一个队列呢,还是发送给许多个队列,或者直接无视,具体的规则是根据路由器的类型而定的。
路由器的类型有这样几种:直连路由器(dirct), 主题路由器(topic),头部路由器(headers),以及多广播路由器(fanout)
channel.exchangeDeclare("logs", "fanout");
广播路由器听起来就很简单,它会将消息广播到所有的它所知道的队列,而这正是我们所需要的。
默认路由器
在前面的章节中虽然没有设置任何路由器,但依然能够将消息发送到队列,这是因为我们的是默认路由器:使用空字符串("")来做的定义:
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数是exchange的名称,在这里是空字符串,消息会通过路由健(routingKey)发送到该键所对应的队列。
然而现在,我们有了确认的路由器
channel.basicPublish( "logs", "", null, message.getBytes());
临时队列
我们之前队列都有名字(Hello队列和task_queue队列),给队列起名字非常重要-需要将消费者绑定到特定的queue上面,以及需要把消息从生产者发送给特定的消费者。
但对于日志来说,消息会发送到所有的消费者,而并非个别,We're also interested only in currently flowing messages not in the old ones.为了满足当前需求我们可以做两件事
- 一旦连接上RabbitMQ,需要一个新的空队列来接受消息,我们可以随机起个名字,甚至根本不起名,而让RabbitMQ来命名它。
- 一旦消费者断开连接,这个队列就能被删除掉
我们可以这样定义一个不需要持久化、独立的、能够被自动删除的队列
String queueName = channel.queueDeclare().getQueue();
这个名称是RabbitMQ随机分配的,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg.
绑定
我们已经声明了一个广播路由器,现在需要告诉这个路由器需要把信息发送给哪些队列,路由器和队列间的这个关系就称之为绑定。
channel.queueBind(queueName, "logs", "");
如此一来路由器就能够把消息发送给相应的队列了。
整合
发送者与我们之前的代码基本相同,最重大的区别我们现在是发送给带名称的路由器了,同时我们也需要一个路由键,但这里也不需要,因为广播路由器会忽略这个值,这是我们EmitLog.java的代码
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
可以看到,一旦我们建立的连接立即定义了一个路由器,这个步骤对我们非常重要,因为是严禁将消息发送给并不存在的路由的。
同时,如果路由器没有绑定队列,消息也会丢失掉,但这对于我们来说是ok的:如果并没有消费者在监听,我们可以直接丢弃掉这个消息。
ReciveLogs.java代码如下:
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
编译代码
javac -cp $CP EmitLog.java ReceiveLogs.java
如果你希望将log存储到本机上
java -cp $CP ReceiveLogs > logs_from_rabbit.log
如果你希望在屏幕上显示log信息,打开一个新的终端:
java -cp $CP ReceiveLogs
发送消息
java -cp $CP EmitLog
如此一来,就能够存储消息的同时进行打印了。