RabbitMQ(二) RabbitMQ高级特性
消息如何保障100%的投递成功
什么是生产端的可靠性投递
- 保障消息的成功发出
- 保障MQ节点的成功接收
- 发送端收到MQ节点(Borker)确认应答
- 完善的消息进行补偿机制
生产端-可靠性投递(一)
- 消息落库,对消息状态进行打标
- 消息的延迟投递,做二次确认,回调检查
生产端-可靠性投递(二)
消息落库,对消息状态进行打标
消息落库,对消息进行打标(对消息设置状态,发送中,broker收到,)
定时器轮训,检测未发送的消息,进行二次投递,最大努力尝试(设置最大次数)
- step1 消息落库(唯一的消息id) ,一定是数据库入库成功以后在进行发送消息
- step2 发送消息 到MQ Broker
- step3 Broker Confirm (发送消息确认)
- step4 生产者ConfirmListener (异步监听,Broker回送的响应)
- step5 成功,通过messageId更新消息状态
补偿
分布式定时任务,抓取数据(超过第一时长),尝试重发,重试次数限制
生产端-可靠性投递(三)
消息的延迟投递,做二次确认,回调检查 (最大限度的减少消息落库)
方案一在高并发场景下,每次消息落库,影响性能(IO操作)
step1: 业务消息落库 ,一定是数据库入库成功以后在进行发送消息
step2:第一次消息的发送
step3:延迟消息的检测
step4:监听,处理完,生成一条新消息
step5:通过队列发送,确认 不是之前的ack
幂等性概念
幂等性是什么
借鉴数据库的乐观锁机制
执行一条更新SQL
update t_reps set count=count-1,version=version+1 where verison=1
消费端幂等性保障
在业务高峰期,如何避免消息的重复消费问题
消费端实现幂等性,就意味着,我们的消息永远不会被消费多次,即时收到多条一样的消息。
幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/how-to-ensure-that-messages-are-not-repeatedly-consumed.md
业界主流的幂等性操作
- 唯一ID+指纹码机制,利用数据库主键去重
- 利用Redis的原子性实现
方案一:
唯一ID+指纹码机制
- 唯一ID+指纹码机制,利用数据库主键去重
- select count(1) from t_order where id=唯一id+指纹码
- 好处:实现简单
- 坏处:高并发下有数据库写入的性能瓶颈
- 解决方案:跟进ID进行分库分表进行路由算法
方案二:利用Redis的原子性实现
Confirm确认消息
理解Confirm消息确认机制:
- 消息的去人,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答
- 生产者进行接收应该,用来确定这条消息是否正常的发送到broker,这种方式也是消息的可靠性投递的核心保障
confirm确认消息流程解析
confirm确认消息实现
- 第一步:在channel上开启确认模式: channel.sconfirmSelect()
- 第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理。
示例
生产者
/** * @author niugang */ public class Producer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 获取C onnection Connection connection = connectionFactory.newConnection(); //3 通过Connection创建一个新的Channel Channel channel = connection.createChannel(); //4 指定我们的消息投递模式: 消息的确认模式 channel.confirmSelect(); String exchangeName = "test_confirm_exchange"; String routingKey = "confirm.save"; //5 发送一条消息 String msg = "Hello RabbitMQ Send confirm message!"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); //6 添加一个确认监听 channel.addConfirmListener(new ConfirmListener() { //失败 // deliveryTag 消息唯一标签 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------no ack!-----------"); } //成功 @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------ack!-----------"); } }); } }
消费者
/** * @author niugang */ public class Consumer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 获取C onnection Connection connection = connectionFactory.newConnection(); //3 通过Connection创建一个新的Channel Channel channel = connection.createChannel(); String exchangeName = "test_confirm_exchange"; String routingKey = "confirm.#"; String queueName = "test_confirm_queue"; //4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key channel.exchangeDeclare(exchangeName, "topic", true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //5 创建消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费端: " + msg); } } }
Return消息机制
- Return Listener 用于处理一些不可路由的消息。
- 我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中取,然后我们的消费者监听队列,进行消费处理操作。
- 但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener
配置
在基础API上有一个关键的配置项
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息。
流程
生产者
/** * @author niugang */ public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_return_exchange"; String routingKey = "return.save"; String routingKeyError = "abc.save"; String msg = "Hello RabbitMQ Return Message"; channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("---------handle return----------"); //响应码 312 System.err.println("replyCode: " + replyCode); //NO_ROUTE System.err.println("replyText: " + replyText); System.err.println("exchange: " + exchange); System.err.println("routingKey: " + routingKey); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }); channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes()); //channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes()); } }
消费者
/** * @author niugang */ public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingKey = "return.#"; String queueName = "test_return_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费者: " + msg); } } }
消费者自定义监听
- 我们一般就是在代码中编写while循环,进行consumer.nextDelivery方法获取下一条消息,然后进行消费处理!
- 但是我们使用自定义的Consumer更加的方便,解耦性更加的强,也是在实际工作中最常用的使用方式!
实现方式
自定义类,继承 DefaultConsumer
生产者
/** * @author niugang */ public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_consumer_exchange"; String routingKey = "consumer.save"; String msg = "Hello RabbitMQ Consumer Message"; for(int i =0; i<5; i ++){ channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); } } }
自定义消费者
/** * 自定义消费者 * @author niugang */ public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); //消费标签 System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
消费者
/** * @author niugang */ public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_consumer_exchange"; String routingKey = "consumer.#"; String queueName = "test_consumer_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); channel.basicConsume(queueName, true, new MyConsumer(channel)); } }
消费端限流
什么是消费端的限流
- 假设一个场景,首先,我们RabbitMQ服务器上有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况:
- 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据;
- RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息(autoAck为false)的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
- void BasicQos(uint prefetchSize,ushort prefetchCount,bool global);
- prefetchSize:0 #这里为0表示不限制
- prefetchCount: 会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack; (prefetchCount等于1即可)
- global:true\false 是否将上面设置应用于channel
- 简单来说,就是上面限制是channel级别的还是consumer级别;
生产者
/** * @author niugang */ public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhosy"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_qos_exchange"; String routingKey = "qos.save"; String msg = "Hello RabbitMQ QOS Message"; for(int i =0; i<5; i ++){ channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); } } }
自定义消费者
public class MyConsumer extends DefaultConsumer { private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); //ack 注释掉后 控制台只会接收到一条消息 channel.basicAck(envelope.getDeliveryTag(), false); } }
消费者
/** * @author niugang */ public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String routingKey = "qos.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //1 限流方式 第一件事就是 autoAck设置为 false //prefetchCount broker 给 消费者 最大推送消息数量 channel.basicQos(0, 1, false); //手工签收 channel.basicConsume(queueName, false, new MyConsumer(channel)); } }
消息ACK与重回队列
消费端的手工ACK和NACK
- 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿
- 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功
消费端的重回队列
消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker (requeue属性设置)
一般我们在实际应用中,都会关闭重回队列,也就是设置为false;
生产者
/** * ack 测试生产者 * @author niugang */ public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_ack_exchange"; String routingKey = "ack.save"; for(int i =0; i<5; i ++){ Map<String, Object> headers = new HashMap<String, Object>(); headers.put("num", i); //设置消息属性 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2).expiration("1000") .contentEncoding("UTF-8") .headers(headers) .build(); String msg = "Hello RabbitMQ ACK Message " + i; channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); } } }
自定义消费者
/** * 自定义消费者 * @author niugang */ public class MyConsumer extends DefaultConsumer { private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("body: " + new String(body)); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } if((Integer)properties.getHeaders().get("num") == 0) { //multiple 是否是批量 //requeue 重新添加到队列尾部 channel.basicNack(envelope.getDeliveryTag(), false, true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }
消费者
/** * 消费者 * @author niugang */ public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_ack_exchange"; String queueName = "test_ack_queue"; String routingKey = "ack.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); // 手工签收 必须要关闭 autoAck = false channel.basicConsume(queueName, false, new MyConsumer(channel)); } }
TTL队列/消息
TTL
- TTL是Time To Live的缩写,也就是生存时间
- RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。
- RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的删除。
在控制台创建队列
在控制台创建exchange 并添加binding 然后发送消息,然后在队列页面可以看到消息自动被队列剔除
原生API设置TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("1000").build();
Spring AMQP设置TTL
MessageProperties messageProperties = new MessageProperties(); //消息过期时间 messageProperties.setExpiration("1000"); Message stringMessage = new Message("Hello Springboot RabbitMQ".getBytes(), messageProperties);
死信队列
死信队列:DLX ,Dead-Letter-Exchange
- 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX;
消息变成死信有以下几种情况:
- 消息被拒绝(basic.reject/basic.nack) 并且requeue=false;
- 消息TTL过期
- 队列达到最大长度
死信队列详细解释
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能;
死信队列具体设置
step1:首先需要设置死信队列的exchange和queue,然后进行绑定
例如定义如下exchange和queue
- Exchange:dlx.echange
- Queue:dlx.queue
- RoutingKey:#
step2:
然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:arguments.put(“x-dead-letter-exchange”,“dlx.exchange”);
这样消息在过期、requeue、队列在达到最大长度时,消息就可以直接路由到死信队列。
生产者
/** * 私信队列 生产端 * * @author niugang */ public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //自定义普通的exchange String exchange = "test_dlx_exchange"; String routingKey = "dlx.save"; String msg = "Hello RabbitMQ DLX Message"; for (int i = 0; i < 1; i++) { AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") //过期时间为10s .expiration("10000") .build(); channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); } } }
自定义消息消费
/** * 自定义消息消费 * * @author niugang */ public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
生产者
/** * 死信队列消费端 * * @author niugang */ public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 这就是一个普通的交换机 和 队列 以及路由 String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; String queueName = "test_dlx_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); Map<String, Object> agruments = new HashMap<String, Object>(16); //设置死信队列exchange 这些具体的参数可以通过rabbitmq控制台查看 agruments.put("x-dead-letter-exchange", "dlx.exchange"); //这个agruments属性,要设置到声明队列上 channel.queueDeclare(queueName, true, false, false, agruments); channel.queueBind(queueName, exchangeName, routingKey); //要进行死信队列的声明: channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare("dlx.queue", true, false, false, null); channel.queueBind("dlx.queue", "dlx.exchange", "#"); channel.basicConsume(queueName, true, new MyConsumer(channel)); } }