RabbitMQ基础学习03 发布/订阅
http://www.rabbitmq.com/tutorials/tutorial-three-java.html
发布/订阅
在前面的教程中,我们创建了一个工作队列(workqueue)。工作队列是假设每个任务都精确地发送到一个接收者。在这章中,我们将设定一个完全不同的情境:我们将会把一条消息发送给不同的接收者,这被称作:发布/订阅
为了更好地说明本章内容,我们将开发一个简单的日志系统,它有两个小程序组成:一个发布日志信息,另外一个将接收信息被打印出来。
在我们的日志系统中,每一个接收者都能接收到日志信息。这样的话,我们可以将一个接收者接收到的日志保存到硬盘,同时将另外一个接收者的日志打印出来。
从本质上来讲,发布的日志将会发布到所有的接收者。
Exchanges
在前面的教程中,我们通过一个队列来发送和接收消息。现在,我们需要了解清楚Rabbit的完整消息模型。
我们先快速地回顾一下之前的教程内容:
producer负责发送消息
queue负责存储消息
consumer负责接收消息
RabbitMQ的消息模型的关键在于:producer从不直接发送消息到队列。事实上,一个producer通常根本不知道一个消息将会被发送到哪个队列。
事实上,producer仅仅是将消息发送到一个Exchange。Exchange非常简单,一方面,它负责从producer接收消息,另一方面,它将消息推送给队列。Exchange必须准确地知道如何处理一条消息。是将消息推送给某一个队列,还是同时推送给多个队列,或者直接删除这个消息。这些规则都是根据exchangetype来定义的。
RabbitMQ有一些内置的exchangetype可以直接使用:direct,topic,headers,fanout。本章我们关注fanout。我们创建一个fanout类型的exchange,并且命名为logs:
channel.exchangeDeclare("logs","fanout");
fanout类型的exchange非常简单,你从它的名字就可以猜到,它将把消息推送给所有它知道的队列。这正是我们的消息日志系统所需要的。
发布消息:
channel.basicPublish("logs","",null,message.getBytes());
临时队列
在前面的教程中,我们使用队列时,都对队列进行命名。为队列命名对我们来说是非常重要的,因为我们需要在多个produces和consumers之间共享某个队列。
但是在我们的日志系统中却不需要对队列进行命名,因为我们需要接收所有的日志消息,而不仅仅是其中的一部分,我们也仅仅只关注当前的消息,而不是以前的旧的日志信息。为实现这个功能,我们需要做两件事情。
第一:当我们连接到Rabbit时,我们需要一个新的、空的队列。我们可以创建一个随机的队列,或者还有更好的方法,我们可以让服务器为我们随机选择一个队列名称。
第二:一旦我们断开连接,队列应当被自动删除。
在java中,我们可以这个创建一个非持久化的、专用的、能自动删除的随机队列:
String queueName = channel.queueDeclare().getQueue();
绑定
我们已经创建了一个fanout类型的exchange以及一个队列。现在我们需要告诉exchange将接收到的消息发送给我们的队列。这个建立关系的过程叫做绑定。
channel.queueBind(queueName, "logs", "");