RabbitMQ 学习初入门
RabbitMQ是一个消息队列,我们可以使用RabbitMQ 做消息队列,消息通知的业务功能,而且根据网上的不可靠消息得出,RabbitMQ 的性能水平甚至比 activeMQ 还要好,所以也是我选择认真去学习RabbitMQ的原因,当然我也有做个关于 activeMQ 的一些简单的 Demo 有机会的话可以分享出来~
Rabbit 使用 Erlang 来编写的AMQP 服务器。 什么是 AMQP 本人已经百度给大家了:
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。
哎哟,先从按照说起吧~ 我使用macbook 来进行学习和开发的 顺便提一下 本人是位果粉,别喷~ 使用 MAC OS 安非常简单,还我是建议使用 MAC OS 的同学使用 brew 进行开发,没有别的 就是因为方便。
使用这个命令之前 如果没有安装过 Homebrew 就安装一下吧 有机会我也分享一下安装方式 顺便介绍一下这个让你懒癌晚期的Homebrew。
打开终端输入:
brew install rabbitmq
然后会告诉你 无法获得 /usr/local/sbin 的写入权限。 因为是英文的错误提示,本人会看不会写 所以直接写中文给各位看了~ 还是那句 别喷~ 谢谢
好,接下来让他有权限,其实这个目录是没有的sbin。
mkdir /usr/local/sbin 创建目录 chmod 777 /usr/local/sbin 开权限,全开别烦 brew link rabbitmq link 一下
然后就没有然后了
启动 rabbitMQ
MacBook-Pro:~ Tony$ /usr/local/sbin/rabbitmq-server RabbitMQ 3.5.6. Copyright (C) 2007-2015 Pivotal Software, Inc. ## ## Licensed under the MPL. See http://www.rabbitmq.com/ ## ## ########## Logs: /usr/local/var/log/rabbitmq/[email protected] ###### ## /usr/local/var/log/rabbitmq/[email protected] ########## Starting broker... completed with 10 plugins.
目前为止已经安装完 rabbitMQ 并且启动起来 了 简单到飞
开始废话 说说概念,别方 我也是抄书的~
消费者和生产者,由于书本的篇幅太长,就抄了!简单明了告诉你们,消息的产生者就是生产者,获得消息的称为消息消费者。我QQ M了你一下 我就是消息的生产者了,你在QQ看到我的消息了,你就是消息的消费者了,这才是正常写书方式嘛··· 举个开发的业务背景的例子,我支付了一张订单,这个业务产生了几个次要的业务,例如积分业务,我购买了订单,订单核心功能处理订单的业务,然后生产了一个消息并通过rabbitMQ 发送了一个消息,内容为产生了一个订单,然后订阅了消息的消息消费者就会消费消息,然后根据消息处理自己的业务,该干嘛干嘛去~ 积分业务就去加你的积分等等。当然这个只是举例,别吐槽 可以在台服务器或者一个业务方法上实现。 上面的举例对于一些分布式的微服务上是非常常见的方式了~
几个重要的概念:
信道:在连接到rabbitMQ 之后,你将可以创建并获得一个AMQP信道,信道是创建在“真实的”TCP连接内的虚拟链接。所有的AMQP命令都是通过 AMQP 信道发出去的,每条信道会被指派一个唯一的ID。所有发布消息还是订阅都是通过AMQP 信道去处理的。这个概念非常像端口号【但是不是端口啊 ,别说我混淆你】,而且他的英文名字可以理解到他的意思channel 有使用NIO 的开发者已经比较了解~ 其实使用AMQP信道的原因非常简单,建立TCP连接的性能代价是非常高的,当你在不同的线程当中去创建TCP连接的方式去和RabbitMQ进行通信的代价是十分高的,但是在多个不同的线程当中通过channel 我们就可以通过同一个TCP连接进行通信了。好像写得好乱不知道他们理不理解我写什么,唉 算了 本人的水平有限,况且大多数情况下都是我自己看而已,放弃~ 抽象图如下 丑,不过先将就~
交换器 与 路由键: 这个好难解释啊~ 直接说工作流程吧~ 哎哟其实应该把 队列写到这里来,算了吧 不改了。 先记住,交换器当中里面有N个队列,而路由键 是让交换器知道消息应该放到哪个队列当中去。别废话举例: 产生消息 要告诉 RabbitMQ 这个消息是放到那个交换器上,【注意:交换器是由开发者去创建的你可以搞N个交换器出来,没有管你】
上代码:
this.channel.basicPublish(“hello-exchange”,”hola”,properties, SerializationUtils.serialize(msg));
首先 hello-exchange 是交换器的名字, hola 是路由键 这个让hello-exchange 去判断应该分发到那个队列当中,SerializationUtils.serialize(msg) 将一个消息序列化。 channel 就不用说啦上面说了,是一个AMQP信道。
交换器有很多不同的类型一会会一一介绍。交换器的类型不同也注定了消息会分发到哪个队列当中,所有尤其重要。
工作流程:
1、消费者订阅消息,先创建一个交换器,如果交换器已经存在即返回一个交换器。当然你可以passive 为 true 如果 passive 为 true , 交换器已经存在返回一个已经存在的交换器,否则报错。我暂时不知道用在哪里,所以 先记住一般情况下 创建一个交换器,如果存在就返回一个已经存在的交换器,如果不存在则创建并返回。
JAVA 代码如下【直接给源码的方法定义,是不是很贴心】:
备注是自己翻译的别喷 /** * Declare an exchange, via an interface that allows the complete set of * arguments. * @see com.rabbitmq.client.AMQP.Exchange.Declare * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk * @param exchange the name of the exchange 【交换器名称】 * @param type the exchange type 【交换器类型 一会说 别着急】 * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) 【是否持久化交换器,这个后面说,还是简单说一下,纠结帝~ 就是交换器中的消息会持久化,就是会存在硬盘当中,宕机或重启服务时会自动恢复当时会牺牲性能,除非你的SSD硬盘好快,当我没有说过~】 * @param autoDelete true if the server should delete the exchange when it is no longer in use 【当这个交换器 已经没有人使用的时候 会自动删除,longer 长时期 当时我不知道长时期是什么时候,所以我会记住是 没人用就自己删掉 自动消失】 * @param internal true if the exchange is internal, i.e. can't be directly * published to by a client. 【不知道什么鬼意思,如果是内部 不会直接发送到客户端,我一般写false ,真心不知道他什么意思,后面学下去应该明白】 * @param arguments other properties (construction arguments) for the exchange 【其他参数 暂时没有用到】 * @return a declaration-confirm method to indicate the exchange was successfully declared * @throws java.io.IOException if an error is encountered 【会抛出IO异常】 */ Exchange.DeclareOk exchangeDeclare(String exchange,String type, boolean durable, boolean autoDelete, boolean internal,Map<String, Object> arguments) throws IOException;
2、消费者创建队列,并绑定队列到交换器当中,上写路由键,让交换器知道这个路由键的消息是跑到这个队列当中的。
JAVA 代码如下【直接给源码的方法定义】:
/** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue 【队列名称】 * @param durable true if we are declaring a durable queue (the queue will survive a server restart) 【持久化,这里特别说一下,如果你想消息是持久化的,必须消息是持久化的,交换器也是持久化的,队列更是持久化的,其中一个不是也无法恢复消息】 * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) 【私有的,独有的。 这个队列之后这个应用可以消费,上面的英文注释是 说restricted to this connection 就是限制在这个连接可以消费,就是说不限制channel信道咯,具体没有试过,但是应该是这样,除非备注骗我,我读得书少,你唔好呃我!!!】 * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 【没有人使用自动删除】 注意:如果exclusive为true 最好 autodelete都为true 至于为什么 这么简单自己想~ * @param arguments other properties (construction arguments) for the queue 【其他参数没有玩过】 * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;/** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
绑定队列和交换器 并指定路由键 /** * Bind a queue to an exchange. * @see com.rabbitmq.client.AMQP.Queue.Bind * @see com.rabbitmq.client.AMQP.Queue.BindOk * @param queue the name of the queue 【队列名称】 * @param exchange the name of the exchange 【交换器名称】 * @param routingKey the routine key to use for the binding 【路由键】 * @param arguments other properties (binding parameters) 【其他参数 还是那句没有玩过】 * @return a binding-confirm method if the binding was successfully created * @throws java.io.IOException if an error is encountered */ Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
3、然后消费者就去订阅消息咯,注意在JAVA里面的订阅方法会产生线程阻塞的。
JAVA 代码:
订阅消息并消费 /** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag. * @param queue the name of the queue 【所订阅消费的队列】 * @param autoAck true if the server should consider messages 【是否为自动确定消息,好像TCP的ack syn 啊,可怕的7层模型!!!一般写true就可以】 * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param callback an interface to the consumer object 【回调callback 这个马上上代码看看】 * @return the consumerTag generated by the server * @throws java.io.IOException if an error is encountered * @see com.rabbitmq.client.AMQP.Basic.Consume * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) */ String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
接收到消息之后,马上有回调,来看看回调接口:
/** * Interface for application callback objects to receive notifications and messages from * a queue by subscription. * Most implementations will subclass {@link DefaultConsumer}. * <p/> * The methods of this interface are invoked in a dispatch * thread which is separate from the {@link Connection}'s thread. This * allows {@link Consumer}s to call {@link Channel} or {@link * Connection} methods without causing a deadlock. * <p/> * The {@link Consumer}s on a particular {@link Channel} are invoked serially on one or more * dispatch threads. {@link Consumer}s should avoid executing long-running code * because this will delay dispatch of messages to other {@link Consumer}s on the same * {@link Channel}. * * @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer) * @see Channel#basicCancel */ public interface Consumer { /** * Called when the consumer is registered by a call to any of the * {@link Channel#basicConsume} methods. * @param consumerTag the <i>consumer tag</i> associated with the consumer */ void handleConsumeOk(String consumerTag); /** * Called when the consumer is cancelled by a call to {@link Channel#basicCancel}. * @param consumerTag the <i>consumer tag</i> associated with the consumer */ void handleCancelOk(String consumerTag); /** * Called when the consumer is cancelled for reasons <i>other than</i> by a call to * {@link Channel#basicCancel}. For example, the queue has been deleted. * See {@link #handleCancelOk} for notification of consumer * cancellation due to {@link Channel#basicCancel}. * @param consumerTag the <i>consumer tag</i> associated with the consumer * @throws IOException */ void handleCancel(String consumerTag) throws IOException; /** * Called when either the channel or the underlying connection has been shut down. * @param consumerTag the <i>consumer tag</i> associated with the consumer * @param sig a {@link ShutdownSignalException} indicating the reason for the shut down */ void handleShutdownSignal(String consumerTag, ShutdownSignalException sig); /** * Called when a <code><b>basic.recover-ok</b></code> is received * in reply to a <code><b>basic.recover</b></code>. All messages * received before this is invoked that haven't been <i>ack</i>'ed will be * re-delivered. All messages received afterwards won't be. * @param consumerTag the <i>consumer tag</i> associated with the consumer */ void handleRecoverOk(String consumerTag); /** * Called when a <code><b>basic.deliver</b></code> is received for this consumer. * @param consumerTag the <i>consumer tag</i> associated with the consumer * @param envelope packaging data for the message * @param properties content header data for the message * @param body the message body (opaque, client-specific byte array) * @throws IOException if the consumer encounters an I/O error while processing the message * @see Envelope 宝宝累了 看重点,这个方法就是消息到达的时候回调的方法其他你们喜欢研究 可以认真看看英文备注。 */ void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException; }
注意一下:其实也可以使用其他订阅方式去消费消息的,可以使用get的方式去获得一条消息,然后会在队列当中给你一条消息,但是你会想就不需要使用上面的订阅方法啦,直接跑个循环就可以啦。其实get方法会开启订阅获得一条消息后关闭订阅,这样会产生不必要的性能开销的,除非老板让你搞卡一点,让客户掏点优化费,不然就别这样干了。get的方法这里就先不介绍啦~
4、上面所说的都是消息消费者的工作,这个step开始说消息生产者要做的。开启一个交换器,当然这个交换器应该是存在不用创建的因为 上面所说消费者已经创建过了,但是其实谁去先创建都是可以的,消费者也可以创建,生产者也可以,这个倒是没有什么关系,主要是看你的业务需求【这句话甩锅用】。由于已经贴出过创建交换器的代码所以就不贴了,看上面。
与上面的交换器创建一样。不贴代码了自己拖上去看吧~
5、好像没有什么别的事情了,就是产生一个消息然后发过去。上代码
JAVA代码:
/** * Publish a message * @see com.rabbitmq.client.AMQP.Basic.Publish * @param exchange the exchange to publish the message to 【发送到那个交换器上】 * @param routingKey the routing key 【路由键 决定消息去哪个队列里面混】 * @param props other properties for the message - routing headers etc 【其他消息参数, 哎~ 这个我玩过,一会DEMO时间,表演一下】 * @param body the message body 【消息体,一般把对象序列化一下发过去】 * @throws java.io.IOException if an error is encountered */ void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
然后就没有然后了 上面 第三步的 订阅方法中的回调 就会有这个消息了。 好又到说概念的时候了。 困的同学先休息。
消息的投递方式,我也困了其实:
【1】 消息MESSAGE_A 到达 queue_A 队列
【2】 RabbitMQ 把 MESSAGE_A 发送到 APPLICATION A(消费者)
【3】 APPLICATION A 确定收到了消息 发个ack 高兴一下
【4】 RabbitMQ 把 消息从 queue_A 队列删除。
上面其实也有说过,autoack参数 有印象吧~ 所以autoack 设置成true 就没错了。这里就可以开展其他话题了,如果多个订阅者怎么办呢,是不是多个订阅者都会收到消息呢,答案是否定的。RabbitMQ的队列会对订阅者做一个循环,例如目前有两个订阅者订阅了同一个队列,serverA 和 serverB 他会循环去发送消息,队列有 ID 1-10 的消息,ID 1消息会由 serverA 处理 ID 2 消息会由 serverB 处理 ID 3 消息会由 serverA 去处理 如此类推。
如果serverA 在发送ACK应答给 RabbitMQ之前 断开连接【就是服务挂了,宕机了】RabbitMQ 会认为这个消息没有处理,即没有消费到这个消息,会把这个消息发送到 serverB 去处理。
而且在消息没有ACK之前,RabbitMQ不会发你第二条消息的,所以如果你想等待消息的任务处理完之后再给第二个消息的话,可以将autoDelete设置成false,这样你就可以在消息处理完之后再去ack。���样也是一个非常通用的场景,以防扎堆处理消息。
其实这样的设计也是非常好的,如果你出现了业务错误,执行消息处理的时候,刚好出现问题了,你可以通过断开连接的方式【这里所指的断开是断开RabbitMQ的connection】让这个消息交给下个订阅者,【这里说一下,如果队列没有消费者去订阅消息的话,消息会存在RabbitMQ当中等待有消费者去订阅再去发送】,当然这也是在你没有ack的情况下。在高级版本的RabbitMQ中可以使用reject命令,让这个消息直接传递到下个订阅者。【在RabbitMQ2.0可以使用】
AMQP信道channel与订阅
如果 一个channel 订阅了一个队列就不能订阅别的队列了,也就是说一个channel只能订阅一个队列。所以你需要取消原来的订阅,并将信道设置为“传输”模式。后面有时间写后面的文章可能会说到。
交换器的类型:
direct :这个简单,只要路由键一模一样就投递到相应的队列当中。
fanout :这个也简单,消息会投递到所有绑定到这个交换器的队列当中。
topic :这个就复杂一点了,我很客观的简单就简单 复杂就复杂 从不忽悠。但是这个也好常用不能不学,好累。在路由键当中可以使用通配符。怎么说呢,好纠结啊 不知道怎么解释啊 怎么办 在线等。
举个例子吧,绑定队列:
//绑定队列 路由键是 tony.teamA 有路由键为tony.teamA的消息就投递到这里 this.channel.queueBind("queueA","topic_exchangeA","tony.teamA",null); //绑定队列 路由键是 yan.teamA 有路由键为tony.teamB的消息就投递到这里 this.channel.queueBind("queueB","topic_exchangeA","yan.teamA",null); //绑定队列 路由键是 *.teamA 有路由键为.teamA结尾的消息就投递到这里 this.channel.queueBind("queueC","topic_exchangeA","*.teamA",null); //绑定队列 路由键是 chao.teamB 有路由键为chao.teamB的消息就投递到这里 this.channel.queueBind("queueD","topic_exchangeA","chao.teamB",null); //绑定队列 路由键是 *.teamB 有路由键为.teamB结尾的消息就投递到这里 this.channel.queueBind("queueE","topic_exchangeA","*.teamB",null); //绑定队列 路由键是 # 所有在这个交换器的消息都投递到这里 this.channel.queueBind("queueF","topic_exchangeA","#",null); // queueA[路由键:tony.teamA]、queueC[路由键:*.teamA]、queueF[路由键:#]会收到消息 this.channel.basicPublish("topic_exchangeA","tony.teamA",properties,SerializationUtils.serialize(msg)); // queueD[路由键:chao.teamB]、queueE[路由键:*.teamB]、queueF[路由键:#]会收到消息 this.channel.basicPublish("topic_exchangeA","chao.teamB",properties,SerializationUtils.serialize(msg));
然后基本的rabbitMQ 的东西就搞好了。不过这只是开始~
简单说一下虚拟主机和隔离吧,简单来说想MySQL 你可以创建很多个库,里面有很多个表。然后呢 rabbitMQ可以创建很多个虚拟主机,虚拟主机里面有很多个交换器,交换器里面有很多个队列,解释得完美。默认会提供一个 默认虚拟主机 vhost : “/”。后面找时间再说这个 vhost 吧。
重头大戏上DEMO,由于方便我阅读回忆,所以我忽略的封装性,一切以容易快速看懂和回忆为目标。别喷~
生产者:
package com.maxfunner; import com.maxfunner.mq.EndPoint; import com.rabbitmq.client.*; import org.apache.commons.lang.SerializationUtils; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Producer */ public class Producer { private Connection connection; private Channel channel; private Map<Long, String> messageMap = new HashMap<Long, String>(); private int maxID = 0; private static final String EXCHANGE_NAME = "MY_EXCHANGE"; public void createConnectionAndChannel() throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); //服务器地址 factory.setUsername("guest"); //默认用户名 factory.setPassword("guest"); //默认密码 factory.setPort(5672); //默认端口,对就是这么屌全部默认的 this.connection = factory.newConnection(); //创建链接 this.channel = this.connection.createChannel(); } public void initChannelAndCreateExchange() throws IOException { this.channel.confirmSelect(); //启用消息确认已经投递成功的回调 /** * 创建了一个交换器,类型为 direct 非持久化 自动删除 没有额外参数 */ this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null); this.channel.addConfirmListener(new ConfirmListener() { /** * 成功的时候回调【这个是当消息到达交换器的时候回调】 * @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。 * @param multiple * @throws IOException */ public void handleAck(long deliveryTag, boolean multiple) throws IOException { String message = messageMap.get(deliveryTag); System.out.println("message : " + message + " ! 发送成功"); messageMap.remove(message); //最后一个消息都搞掂之后 关闭所有东西 if (deliveryTag >= maxID) { closeAnything(); } } /** * 失败的时候回调 * @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。 * @param multiple * @throws IOException */ public void handleNack(long deliveryTag, boolean multiple) throws IOException { String message = messageMap.get(deliveryTag); System.out.println("message : " + message + " ! 发送失败"); messageMap.remove(message); //发送失败就不重发了,发脾气 //最后一个消息都搞掂之后 关闭所有东西 if (deliveryTag >= maxID) { closeAnything(); } } }); } public void sendMessage(String message) throws IOException { AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .contentType("text/plain") //指定是一个文本 .build(); // 发送一个消息 到 EXCHANGE_NAME 的交换器中 路由键为 KEY_A 发送 message 之前序列化一下 具体用什么包上面import自己看 this.channel.basicPublish(EXCHANGE_NAME, "KEY_A", properties, SerializationUtils.serialize(message)); } public void closeAnything() throws IOException { this.channel.close(); //跪安吧 小channel this.connection.close(); //你也滚吧 connection } public static void main(String[] args) throws IOException { Producer producer = new Producer(); producer.createConnectionAndChannel(); producer.initChannelAndCreateExchange(); List<String> messageList = new ArrayList<String>(); messageList.add("message_A"); messageList.add("message_B"); messageList.add("message_C"); messageList.add("message_D"); messageList.add("message_E"); messageList.add("message_F"); producer.maxID = messageList.size(); //记录最后一个ID 当最后一个消息发送成功后关闭连接 //注意:因为channel产生的ID 是从1开始的 for (int i = 1; i <= messageList.size(); i++) { producer.messageMap.put(new Long(i), messageList.get(i - 1)); //这里看懂了吗?没看懂也没有办法了,这里我真不知道怎么解释 producer.sendMessage(messageList.get(i - 1)); } } }
消费者:
package com.maxfunner; import com.maxfunner.mq.QueueConsumer; import com.rabbitmq.client.*; import org.apache.commons.lang.SerializationUtils; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * Consumer */ public class Consumer { private Connection connection; private Channel channel; private Map<Integer,String> messageMap = new HashMap<Integer, String>(); private static final String EXCHANGE_NAME = "MY_EXCHANGE"; /** * 对,你猜得一点都没有错,我是复制的 * @throws IOException */ public void createConnectionAndChannel() throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); //服务器地址 factory.setUsername("guest"); //默认用户名 factory.setPassword("guest"); //默认密码 factory.setPort(5672); //默认端口,对就是这么屌全部默认的 this.connection = factory.newConnection(); //创建链接 this.channel = this.connection.createChannel(); } public void createAndBindQueue() throws IOException { /** * 创建了一个交换器,类型为 direct 非持久化 自动删除 没有额外参数 */ this.channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,true,null); //最好也创建一下交换器,反正已经创建也没有关系 /** * 创建了一个队列, 名称为 QUEUE_A 非持久化 非独有的 自动删除的 没有额外删除的 */ this.channel.queueDeclare("QUEUE_A",false,false,true,null); this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A"); } public static void main(String args[]) throws IOException { final Consumer consumer = new Consumer(); consumer.createConnectionAndChannel(); consumer.createAndBindQueue(); System.out.println("等待消息中。。。。"); new Thread(new Runnable() { public void run() { try { /** * 订阅消息,订阅队列QUEUE_A 获得消息后自动确认 */ consumer.channel.basicConsume("QUEUE_A", true, new com.rabbitmq.client.Consumer() { public void handleConsumeOk(String consumerTag) { } public void handleCancelOk(String consumerTag) { } public void handleCancel(String consumerTag) throws IOException { } public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { } public void handleRecoverOk(String consumerTag) { } public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("message : " + SerializationUtils.deserialize(body)); } }); } catch (IOException e) { e.printStackTrace(); } } }).start(); } }
生产者运行结果:
message : message_A ! 发送成功 message : message_B ! 发送成功 message : message_C ! 发送成功 message : message_D ! 发送成功 message : message_E ! 发送成功 message : message_F ! 发送成功
消费者运行结果:
等待消息中。。。。 message : message_A message : message_B message : message_C message : message_D message : message_E message : message_F
项目我是用maven创建的贴一下maven 的pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.maxfunner</groupId> <artifactId>rabbitlearning</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitlearning</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.0.4</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.1</version> </dependency> </dependencies> </project>
最后一个重点问题
消息“黑洞”,如果在没有绑定队列和交换器之前,所有发出的消息都无法匹配到相应的队列当中,那些消息将永远不会被消费。而且confirm的callback也是会返回成功的即使消息进入了消息“黑洞”。所以在发送消息之前 必须确定队列已经绑定,确保消息能分配到相应的队列当中。 测试很简单,上面的DEMO 先运行 Producer 显示所有消息发送成功,然后再运行 Consumer 发现没有消息可以接收。 再尝试先运行Consumer 再运行 Producer 就发现一切都正常了,这也是为什么我把autoDelete设置为true的原因,有了autoDelete当队列没有人用的时候就会自动删除。所以每次运行都可以测试出问题。要保证消息能够到达指定的队列最好也在Producer中建立队列 而且进行相关的绑定 然后再发送消息 修改一下 Producer 的其中一方法即可。 不写了 累了~
public void initChannelAndCreateExchange() throws IOException { this.channel.confirmSelect(); //启用消息确认已经投递成功的回调 /** * 创建了一个交换器,类型为 direct 非持久化 自动删除 没有额外参数 */ this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null); this.channel.addConfirmListener(new ConfirmListener() { /** * 成功的时候回调【这个是当消息到达交换器的时候回调】 * @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。 * @param multiple * @throws IOException */ public void handleAck(long deliveryTag, boolean multiple) throws IOException { String message = messageMap.get(deliveryTag); System.out.println("message : " + message + " ! 发送成功"); messageMap.remove(message); //最后一个消息都搞掂之后 关闭所有东西 if (deliveryTag >= maxID) { closeAnything(); } } /** * 失败的时候回调 * @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。 * @param multiple * @throws IOException */ public void handleNack(long deliveryTag, boolean multiple) throws IOException { String message = messageMap.get(deliveryTag); System.out.println("message : " + message + " ! 发送失败"); messageMap.remove(message); //发送失败就不重发了,发脾气 //最后一个消息都搞掂之后 关闭所有东西 if (deliveryTag >= maxID) { closeAnything(); } } }); /** * 创建了一个队列, 名称为 QUEUE_A 非持久化 非独有的 自动删除的 没有额外删除的 */ this.channel.queueDeclare("QUEUE_A",false,false,true,null); this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A");
RabbitMQ 的详细介绍:请点这里
RabbitMQ 的下载地址:请点这里