消息中间件(九)-----RabbitMq消息可靠传递
本文参考自:【RabbitMQ】如何进行消息可靠投递【上篇】
Channel相关API
channel.exchangeDeclare()
/** * Declare an exchange. * @see com.rabbitmq.client.AMQP.Exchange.Declare * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk * @param exchange the name of the exchange * @param type the exchange type * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) * @param autoDelete true if the server should delete the exchange when it is no longer in use * @param arguments other properties (construction arguments) for the exchange * @return a declaration-confirm method to indicate the exchange was successfully declared * @throws java.io.IOException if an error is encountered */ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;
type:有direct、fanout、topic三种
durable:true、false true:服务器重启会保留下来Exchange。警告:仅设置此选项,不代表消息持久化。即不保证重启后消息还在。原文:true if we are declaring a durable exchange (the exchange will survive a server restart)
autoDelete:true、false.true:当已经没有消费者时,服务器是否可以删除该Exchange。原文1:true if the server should delete the exchange when it is no longer in use。
chanel.basicQos()
/** * Request specific "quality of service" settings. * * These settings impose limits on the amount of data the server * will deliver to consumers before requiring acknowledgements. * Thus they provide a means of consumer-initiated flow control. * @see com.rabbitmq.client.AMQP.Basic.Qos * @param prefetchSize maximum amount of content (measured in * octets) that the server will deliver, 0 if unlimited * @param prefetchCount maximum number of messages that the server * will deliver, 0 if unlimited * @param global true if the settings should be applied to the * entire channel rather than each consumer * @throws java.io.IOException if an error is encountered */ void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
prefetchSize:0
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
global:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别
备注:据说prefetchSize 和global这两项,rabbitmq没有实现,暂且不研究
channel.basicPublish()
/** * Publish a message. * * Publishing to a non-existent exchange will result in a channel-level * protocol exception, which closes the channel. * * Invocations of <code>Channel#basicPublish</code> will eventually block if a * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect. * * @see com.rabbitmq.client.AMQP.Basic.Publish * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>. * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param mandatory true if the ‘mandatory‘ flag is to be set * @param immediate true if the ‘immediate‘ flag is to be * set. Note that the RabbitMQ server does not support this flag. * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */ void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉
immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
简单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消
息入队列等待消费者了。
channel.basicAck()
/** * Acknowledge one or several received * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method * containing the received message being acknowledged. * @see com.rabbitmq.client.AMQP.Basic.Ack * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver} * @param multiple true to acknowledge all messages up to and * including the supplied delivery tag; false to acknowledge just * the supplied delivery tag. * @throws java.io.IOException if an error is encountered */ void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true)
/** * Reject one or several received messages. * * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected. * @see com.rabbitmq.client.AMQP.Basic.Nack * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver} * @param multiple true to reject all messages up to and including * the supplied delivery tag; false to reject just the supplied * delivery tag. * @param requeue true if the rejected message(s) should be requeued rather * than discarded/dead-lettered * @throws java.io.IOException if an error is encountered */ void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false)
/** * Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method * containing the received message being rejected. * @see com.rabbitmq.client.AMQP.Basic.Reject * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver} * @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered * @throws java.io.IOException if an error is encountered */ void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列
channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息
channel.basicConsume(QUEUE_NAME, true, consumer)
/** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag. * @param queue the name of the queue * @param autoAck true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param callback an interface to the consumer object * @return the consumerTag generated by the server * @throws java.io.IOException if an error is encountered * @see com.rabbitmq.client.AMQP.Basic.Consume * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) */ String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
autoAck:是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答
chanel.exchangeBind()
/** * Bind an exchange to an exchange, with no extra arguments. * @see com.rabbitmq.client.AMQP.Exchange.Bind * @see com.rabbitmq.client.AMQP.Exchange.BindOk * @param destination the name of the exchange to which messages flow across the binding * @param source the name of the exchange from which messages flow across the binding * @param routingKey the routine key to use for the binding * @return a binding-confirm method if the binding was successfully created * @throws java.io.IOException if an error is encountered */ Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
用于通过绑定bindingKey将queue到Exchange,之后便可以进行消息接收
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
/** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
durable:true、false true:在服务器重启时,能够存活
exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
autodelete:当没有任何消费者使用时,自动删除该队列。this means that the queue will be deleted when there are no more processes consuming messages from it.
消息可靠传递
可靠传递
先来说明一个概念,什么是可靠投递呢?在RabbitMQ中,一个消息从生产者发送到RabbitMQ服务器,需要经历这么几个步骤:
- 生产者准备好需要投递的消息。
- 生产者与RabbitMQ服务器建立连接。
- 生产者发送消息。
- RabbitMQ服务器接收到消息,并将其路由到指定队列。
- RabbitMQ服务器发起回调,告知生产者消息发送成功。
所谓可靠投递,就是确保消息能够百分百从生产者发送到服务器。
为了避免争议,补充说明一下,如果没有设置Mandatory参数,是不需要先路由消息才发起回调的,服务器收到消息后就会进行回调确认。
2、3、5步都是通过TCP连接进行交互,有网络调用的地方就会有事故,网络波动随时都有可能发生,不管是内部机房停电,还是外部光缆被切,网络事故无法预测,虽然这些都是小概率事件,但对于订单等敏感数据处理来说,这些情况下导致消息丢失都是不可接受的。
rabbitmq消息可靠投递
默认情况下,发送消息的操作是不会返回任何信息给生产者的,也就是说,默认情况下生产者是不知道消息有没有正确地到达服务器。
那么如何解决这个问题呢?
对此,RabbitMQ中有一些相关的解决方案:
- 使用事务机制来让生产者感知消息被成功投递到服务器。
- 通过生产者确认机制实现。
在RabbitMQ中,所有确保消息可靠投递的机制都会对性能产生一定影响,如使用不当,可能会对吞吐量造成重大影响,只有通过执行性能基准测试,才能在确定性能与可靠投递之间的平衡。
在使用可靠投递前,需要先思考以下问题:
- 消息发布时,保证消息进入队列的重要性有多高?
- 如果消息无法进行路由,是否应该将该消息返回给发布者?
- 如果消息无法被路由,是否应该将其发送到其他地方稍后再重新进行路由?
- 如果RabbitMQ服务器崩溃了,是否可以接受消息丢失?
- RabbitMQ在处理新消息时是否应该确认它已经为发布者执行了所有请求的路由和持久化?
- 消息发布者是否可以批量投递消息?
- 在可靠投递上是否有可以接受的平衡性?是否可以接受一部分的不可靠性来提升性能?
只考虑平衡性不考虑性能是不行的,至于这个平衡的度具体如何把握,就要具体情况具体分析了,比如像订单数据这样敏感的信息,对可靠性的要求自然要比一般的业务消息对可靠性的要求高的多,因为订单数据是跟钱直接相关的,可能会导致直接的经济损失。
所以不仅应该知道有哪些保证消息可靠性的解决方案,还应该知道每种方案对性能的影响程度,以此来进行方案的选择。
RabbitMQ的事务机制
RabbitMQ是支持AMQP事务机制的,在生产者确认机制之前,事务是确保消息被成功投递的唯一方法。
在SpringBoot项目中,使用RabbitMQ事务其实很简单,只需要声明一个事务管理的Bean,并将RabbitTemplate的事务设置为true即可。
配置文件如下:
spring: rabbitmq: host: localhost password: guest username: guest listener: type: simple simple: default-requeue-rejected: false acknowledge-mode: manual
先来配置一下交换机和队列,以及事务管理器。
@Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange"; public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue"; // 声明业务Exchange @Bean("businessExchange") public FanoutExchange businessExchange(){ return new FanoutExchange(BUSINESS_EXCHANGE_NAME); } // 声明业务队列 @Bean("businessQueue") public Queue businessQueue(){ return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build(); } // 声明业务队列绑定关系 @Bean public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } /** * 配置启用rabbitmq事务 * @param connectionFactory * @return */ @Bean public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); } }
然后创建一个消费者,来监听消息,用以判断消息是否成功发送。
@Slf4j @Component public class BusinessMsgConsumer { @RabbitListener(queues = BUSINESS_QUEUEA_NAME) public void receiveMsg(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("收到业务消息:{}", msg); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } }
然后是消息生产者:
@Slf4j @Component public class BusinessMsgProducer{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { rabbitTemplate.setChannelTransacted(true); } @Transactional public void sendMsg(String msg) { rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME, "key", msg); log.info("msg:{}", msg); if (msg != null && msg.contains("exception")) throw new RuntimeException("surprise!"); log.info("消息已发送 {}" ,msg); } }
这里有两个注意的地方:
- 在初始化方法里,通过使用
rabbitTemplate.setChannelTransacted(true);
来开启事务。 - 在发送消息的方法上加上
@Transactional
注解,这样在该方法中发生异常时,消息将不会发送。
在controller中加一个接口来生产消息:
@RestController public class BusinessController { @Autowired private BusinessMsgProducer producer; @RequestMapping("send") public void sendMsg(String msg){ producer.sendMsg(msg); } }
来验证一下:
msg:1 消息已发送 1 收到业务消息:1 msg:2 消息已发送 2 收到业务消息:2 msg:3 消息已发送 3 收到业务消息:3 msg:exception Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: surprise!] with root cause java.lang.RuntimeException: surprise! at com.mfrank.rabbitmqdemo.producer.BusinessMsgProducer.sendMsg(BusinessMsgProducer.java:30) ...
当 msg
的值为 exception
时, 在调用rabbitTemplate.convertAndSend
方法之后,程序抛出了异常,消息并没有发送出去,而是被当前事务回滚了。
当然,你可以将事务管理器注释掉,或者将初始化方法的开启事务注释掉,这样事务就不会生效,即使在调用了发送消息方法之后,程序发生了异常,消息也会被正常发送和消费。
RabbitMQ中的事务使用起来虽然简单,但是对性能的影响是不可忽视的,因为每次事务的提交都是阻塞式的等待服务器处理返回结果,而默认模式下,客户端是不需要等待的,直接发送就完事了,除此之外,事务消息需要比普通消息多4次与服务器的交互,这就意味着会占用更多的处理时间,所以如果对消息处理速度有较高要求时,尽量不要采用事务机制。
RabbitMQ的生产者确认机制
RabbitMQ中的生产者确认功能是AMQP规范的增强功能,当生产者发布给所有队列的已路由消息被消费者应用程序直接消费时,或者消息被放入队列并根据需要进行持久化时,一个Basic.Ack请求会被发送到生产者,如果消息无法路由,代理服务器将发送一个Basic.Nack RPC请求用于表示失败。然后由生产者决定该如何处理该消息。
也就是说,通过生产者确认机制,生产者可以在消息被服务器成功接收时得到反馈,并有机会处理未被成功接收的消息。
在Springboot中开启RabbitMQ的生产者确认模式也很简单,只多了一行配置:
spring: rabbitmq: host: localhost password: guest username: guest listener: type: simple simple: default-requeue-rejected: false acknowledge-mode: manual publisher-confirms: true
publisher-confirms: true
即表示开启生产者确认模式。
然后将消息生产者的代表进行部分修改:
@Slf4j @Component public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { // rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setConfirmCallback(this); } public void sendCustomMsg(String exchange, String msg) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if (b) { log.info("消息确认成功, id:{}", id); } else { log.error("消息未成功投递, id:{}, cause:{}", id, s); } } }
让生产者继承自RabbitTemplate.ConfirmCallback
类,然后实现其confirm
方法,即可用其接收服务器回调。
需要注意的是,在发送消息时,代码也进行了调整:
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
这里我们为消息设置了消息ID,以便在回调时通过该ID来判断是对哪个消息的回调,因为在回调函数中,我们是无法直接获取到消息内容的,所以需要将消息先暂存起来,根据消息的重要程度,可以考虑使用本地缓存,或者存入Redis中,或者Mysql中,然后在回调时更新其状态或者从缓存中移除,最后使用定时任务对一段时间内未发送的消息进行重新投递。
另外,还需要注意的是,如果将消息发布到不存在的交换机上,那么发布用的信道将会被RabbitMQ关闭。
此外,生产者确认机制跟事务是不能一起工作的,是事务的轻量级替代方案。因为事务和发布者确认模式都是需要先跟服务器协商,对信道启用的一种模式,不能对同一个信道同时使用两种模式。
在生产者确认模式中,消息的确认可以是异步和批量的,所以相比使用事务,性能会更好。
使用事务机制和生产者确认机制都能确保消息被正确的发送至RabbitMQ,这里的“正确发送至RabbitMQ”说的是消息成功被交换机接收,但如果找不到能接收该消息的队列,这条消息也会丢失。至于如何处理那些无法被投递到队列的消息,将会在下篇进行说明。
优化
我们了解了如何保证消息被可靠投递到RabbitMQ的交换机中,但还有一些不完美的地方,试想一下,如果向RabbitMQ服务器发送一条消息,服务器确实也接收到了这条消息,于是给你返回了ACK确认消息,但服务器拿到这条消息一看,找不到路由它的队列,于是就把它丢进了垃圾桶,emmm,我猜应该属于可回收垃圾。
如何让消息可靠投递到队列
如果你对上面的描述还不是很清楚,那我再用代码来说明一次。
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时,生产者是不知道消息被丢弃这个事件的。
我们将上一篇中的交换机类型改为DirectExchange,这样就只有当消息的 RoutingKey 和队列绑定时设置的 Bindingkey (这里即“key”)一致时,才会真正将该消息进行路由。
public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange"; public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue"; // 声明业务 Exchange @Bean("businessExchange") public DirectExchange businessExchange(){ return new DirectExchange(BUSINESS_EXCHANGE_NAME); } // 声明业务队列 @Bean("businessQueue") public Queue businessQueue(){ return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build(); } // 声明业务队列绑定关系 @Bean public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("key"); }
对消息生产者也稍作修改:
@Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { // rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setConfirmCallback(this); } public void sendCustomMsg(String exchange, String msg) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData); correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if (b) { log.info("消息确认成功, id:{}", id); } else { log.error("消息未成功投递, id:{}, cause:{}", id, s); } }
然后我们调用该方法,发送两条消息测试一下:
消息id:ba6bf502-9381-4220-8dc9-313d6a289a4e, msg:1 消息id:f0040a41-dc02-4e45-b8af-e3cfa8a118b2, msg:1 消息确认成功, id:ba6bf502-9381-4220-8dc9-313d6a289a4e 消息确认成功, id:f0040a41-dc02-4e45-b8af-e3cfa8a118b2 收到业务消息:1
可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key”,第二条消息的 RoutingKey 为 “key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。
那么,如何让消息被路由到队列后再返回ACK呢?或者无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。
别慌别慌,RabbitMQ里有两个机制刚好可以解决我们上面的疑问:
1、mandatory 参数 2、备份交换机
mandatory参数
设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
当把 mandotory 参数设置为 true 时,如果交换机无法将消息进行路由时,会将该消息返回给生产者,而如果该参数设置为false,如果发现消息无法进行路由,则直接丢弃。
那么如何设置这个参数呢?在发送消息的时候,只需要在初始化方法添加一行代码即可:
rabbitTemplate.setMandatory(true);
开启之后我们再重新运行前面的代码:
消息id:19729f33-15c4-4c1b-8d48-044c301e2a8e, msg:1 消息id:4aea5c57-3e71-4a7b-8a00-1595d2b568eb, msg:1 消息确认成功, id:19729f33-15c4-4c1b-8d48-044c301e2a8e Returned message but no callback available 消息确认成功, id:4aea5c57-3e71-4a7b-8a00-1595d2b568eb 收到业务消息:1
我们看到中间多了一行提示 Returned message but no callback available
这是什么意思呢?
我们上面提到,设置 mandatory 参数后,如果消息无法被路由,则会返回给生产者,是通过回调的方式进行的,所以,生产者需要设置相应的回调函数才能接受该消息。
为了进行回调,我们需要实现一个接口 RabbitTemplate.ReturnCallback
。
@Slf4j @Component public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(this); } public void sendCustomMsg(String exchange, String msg) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData); correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if (b) { log.info("消息确认成功, id:{}", id); } else { log.error("消息未成功投递, id:{}, cause:{}", id, s); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息被服务器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}", new String(message.getBody()), replyCode, replyText, exchange, routingKey); } }
然后我们再来重新运行一次:
消息id:2e5c336a-883a-474e-b40e-b6e3499088ef, msg:1 消息id:85c771cb-c88f-47dd-adea-f0da57138423, msg:1 消息确认成功, id:2e5c336a-883a-474e-b40e-b6e3499088ef 消息无法被路由,被服务器退回。msg:1, replyCode:312. replyText:NO_ROUTE, exchange:rabbitmq.tx.demo.simple.business.exchange, routingKey :key2 消息确认成功, id:85c771cb-c88f-47dd-adea-f0da57138423 收到业务消息:1
可以看到,我们接收到了被退回的消息,并带上了消息被退回的原因:NO_ROUTE
。但是要注意的是, mandatory 参数仅仅是在当消息无法被路由的时候,让生产者可以感知到这一点,只要开启了生产者确认机制,无论是否设置了 mandatory 参数,都会在交换机接收到消息时进行消息确认回调,而且通常消息的退回回调会在消息的确认回调之前。
备份交换机
有了 mandatory 参数,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。
而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?
前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。
不要慌,在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。
什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会将这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
听的不太明白?没关系,看个图就知道是怎么回事了。
接下来,我们就来设置一下备份交换机:
@Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.backup.test.exchange"; public static final String BUSINESS_QUEUE_NAME = "rabbitmq.backup.test.queue"; public static final String BUSINESS_BACKUP_EXCHANGE_NAME = "rabbitmq.backup.test.backup-exchange"; public static final String BUSINESS_BACKUP_QUEUE_NAME = "rabbitmq.backup.test.backup-queue"; public static final String BUSINESS_BACKUP_WARNING_QUEUE_NAME = "rabbitmq.backup.test.backup-warning-queue"; // 声明业务 Exchange @Bean("businessExchange") public DirectExchange businessExchange(){ ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(BUSINESS_EXCHANGE_NAME) .durable(true) .withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME); return (DirectExchange)exchangeBuilder.build(); } // 声明备份 Exchange @Bean("backupExchange") public FanoutExchange backupExchange(){ ExchangeBuilder exchangeBuilder = ExchangeBuilder.fanoutExchange(BUSINESS_BACKUP_EXCHANGE_NAME) .durable(true); return (FanoutExchange)exchangeBuilder.build(); } // 声明业务队列 @Bean("businessQueue") public Queue businessQueue(){ return QueueBuilder.durable(BUSINESS_QUEUE_NAME).build(); } // 声明业务队列绑定关系 @Bean public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("key"); } // 声明备份队列 @Bean("backupQueue") public Queue backupQueue(){ return QueueBuilder.durable(BUSINESS_BACKUP_QUEUE_NAME).build(); } // 声明报警队列 @Bean("warningQueue") public Queue warningQueue(){ return QueueBuilder.durable(BUSINESS_BACKUP_WARNING_QUEUE_NAME).build(); } // 声明备份队列绑定关系 @Bean public Binding backupBinding(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } // 声明备份报警队列绑定关系 @Bean public Binding backupWarningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } }
这里我们使用 ExchangeBuilder
来创建交换机,并为其设置备份交换机:
.withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);
为业务交换机绑定了一个队列,为备份交换机绑定了两个队列,一个用来存储不可投递消息,待之后人工处理,一个专门用来做报警用途。
接下来,分别为业务交换机和备份交换机创建消费者:
@Slf4j @Component public class BusinessMsgConsumer { @RabbitListener(queues = BUSINESS_QUEUE_NAME) public void receiveMsg(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("收到业务消息:{}", msg); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } }
@Slf4j @Component public class BusinessWaringConsumer { @RabbitListener(queues = BUSINESS_BACKUP_WARNING_QUEUE_NAME) public void receiveMsg(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.error("发现不可路由消息:{}", msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
接下来我们分别发送一条可路由消息和不可路由消息:
@Slf4j @Component public class BusinessMsgProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendCustomMsg(String exchange, String msg) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData); correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData); } }
消息如下:
消息id:5c3a33c9-0764-4d1f-bf6a-a00d771dccb4, msg:1 消息id:42ac8c35-1d0a-4413-a1df-c26a85435354, msg:1 收到业务消息:1 发现不可路由消息:1
这里仅仅使用 error 日志配合日志系统进行报警,如果是敏感数据,可以使用邮件、钉钉、短信、电话等报警方式来提高时效性。
那么问题来了,mandatory 参数与备份交换机可以一起使用吗?设置 mandatory 参数会让交换机将不可路由消息退回给生产者,而备份交换机会让交换机将不可路由消息转发给它,那么如果两者同时开启,消息究竟何去何从??
emmm,想这么多干嘛,试试不就知道了。
修改一下生产者即可:
@Slf4j @Component public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { // rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(this); } public void sendCustomMsg(String exchange, String msg) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData); correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if (b) { log.info("消息确认成功, id:{}", id); } else { log.error("消息未成功投递, id:{}, cause:{}", id, s); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息被服务器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}", new String(message.getBody()), replyCode, replyText, exchange, routingKey); } }
再来测试一下:
消息id:0a3eca1e-d937-418c-a7ce-bfb8ce25fdd4, msg:1 消息id:d8c9e010-e120-46da-a42e-1ba21026ff06, msg:1 消息确认成功, id:0a3eca1e-d937-418c-a7ce-bfb8ce25fdd4 消息确认成功, id:d8c9e010-e120-46da-a42e-1ba21026ff06 发现不可路由消息:1 收到业务消息:1
可以看到,两条消息都可以收到确认成功回调,但是不可路由消息不会被回退给生产者,而是直接转发给备份交换机。可见备份交换机的处理优先级更高。