ActiveMQ高级特性
大纲
(一)、异步投递与确认签收回调
1.What?Why?
ActiveMQ 支持同步、异步两种发送的模式将消息发送到 broker,模式的选择对发送延时有巨大的影响。producer 能达到怎样的产出率(产出率 = 发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。
ActiveMQ 默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。
如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞 producer 直到 broker 返回一个确认,表示消息己经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。
很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。
异步投递可以最大化 produer 端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升 Producer 性能;
不过这也带来了额外的问题,就是需要消耗较多的 Client 端内存同时也会导致 broker 端性能消耗增加;此外它不能有效的确保消息的发送成功。在 useAsyncSend=true 的情况下客户端需要容忍消息丢失的可能。
异步发送如何确认发送成功?
异步发送丢失消息的场景是:生产者设置 UseAsyncSend=true,使用 producer.send(msg)发送消息
由于消息不阻塞,生产者会认为所有 send 的消息均被成功发送至 MQ。如果 MQ 突然宕机,此时生产者端内存中尚未被发送至 MQ 的消息都会丢失。
所以,正确的异步方法是需要接收回调的
同步发送和异步发送的区别就在此,同步发送等send不阻塞了就表示一定发送成功了,异步发送需要接收回执并由客户端再判断一次是否发送成功。
1.How?
1)非SpringBoot项目中
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true"); //使用连接URI配置异步发送 ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);//在ConnectionFactory 级别配置异步发送 ((ActiveMQConnection)connection).setUseAsyncSend(true); //在连接级别配置异步发送
1)SpringBoot项目中
@Component public class QueueProducer { @Autowired private JmsMessagingTemplate template; @Autowired private Queue queue; @Autowired private ActiveMQConnection connection; /** * 消息同步发送: 在消息发送成功到MQ中之前,Producer端一直处于阻塞状态,消息的发送成功与否可从阻塞为阻塞看出 * 1.强制要求同步发送 2.事务持久化发送时 * 消息异步发送: 消息发出后,MQ中未收到消息之前宕机了,但Producer没有显示发送成功与否,会认为发送成功 * 此时,为避免这种情况-->有异步发送的回调机制,发送成功或失败后会给Producer回调结果 */ public void produceMessage() {//template.getConnectionFactory().createConnection().createSession(); /*进行复杂应用设置时,可从template中获取connection,进行原始的操作,但此时使用了pool的session缓存spring.jms.cache.enable默认为true, 返回的Connection为CacheConnectionFactory,无法生成ActiveMQMessageProducer,无法进行send方法的AsyncCallback回调 ,所以此时需要创建一个ActiveMQMessageConnection的配置类以支持异步的调用并成功回调*/ try { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue); ObjectMessage message = session.createObjectMessage(); message.setStringProperty("msgId", "001"); message.setObject(new User(1, "小明")); producer.send(message, new AsyncCallback() { @Override public void onSuccess() { try { System.out.println("消息发送成功:" + message.getStringProperty("msgId")); } catch (JMSException e) { e.printStackTrace(); } } @Override public void onException(JMSException e) { try { System.out.println("出现异常:" + message.getStringProperty("msgId")); } catch (JMSException je) { je.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } //template.convertAndSend(queue,"message:"+new Date().toLocaleString()); }
public class ActiveMQConnectionConfig { @Value("${spring.activemq.broker-url}") private String brokerUrl; @Bean public ActiveMQConnection activeMQConnection() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl); //启用异步发送消息 activeMQConnectionFactory.setUseAsyncSend(true); ActiveMQConnection connection = (ActiveMQConnection) activeMQConnectionFactory.createConnection(); return connection;
(二)、延时投递与定时投递
1.What?
5.4版的ActiveMQ在ActiveMQ消息代理中内置了一个可选的持久性调度程序。通过在“ Xml配置”中将broker schedulerSupport属性设置为true 可以启用此功能。ActiveMQ客户端可以通过使用以下消息属性来利用延迟传递。
2.How?
先在 activemq.xml 中配置 schedulerSupport 属性为 true
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${ activemq. data}" schedulerSupport="true" />
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,10*1000); //延时10秒发送 message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,10); //重复投递10次 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,5*1000); //重复投递的间隔时间:5秒 message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); //使用Corn表达式
重要说明 cron不是spring里面的定义任务表达式而是操作系统的定任务表达式,因为当前的mq安装在Linux 测试时要使用linux的表达式
CRON表达式的优先级高于另外三个参数,如果在设置了CRON的同时,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔为period。就是说设置是叠加的效果。例如每小时都会发生消息被投递10次,延迟1秒开始,每次间隔1秒
https://blog.csdn.net/mengzuchao/article/details/81172305
(三)、消费重试机制
1.What?Why?
activeMQ中的消息重发,指的是消息可以被broker重新分派给消费者,不一定的之前的消费者。重发消息之后,消费者可以重新消费。消息重发的情况有以下几种。
1.事务会话中,当还未进行session.commit()时,进行session.rollback(),那么所有还没commit的消息都会进行重发。
2.使用客户端手动确认的方式时,还未进行确认并且执行Session.recover(),那么所有还没acknowledge的消息都会进行重发。
3.所有未ack的消息,当进行session.closed()关闭事务,那么所有还没ack的消息broker端都会进行重发,而且是马上重发。
4.消息被消费者拉取之后,超时没有响应ack,消息会被broker**重发**。
重发指的是消息经过broker重新进行转发给消费者,经过测试,1和2的情况消息重发会发送给原来的消费者,3和4可以转发消息给别的消费者。累计次数超过设置的maximumRedeliveries时消息都会都会进入死信队列。
消息的重发时间间隔和重发次数
间隔 1 次数 6 (6次之后进入死信队列)
2.How?
Java
Spring
Sprigboot
(四)、死信队列(DLQ)
http://activemq.apache.org/message-redelivery-and-dlq-handling.html
1.What?Why?
DLQ-死信队列(Dead Letter Queue)用来保存处理失败或者过期的消息。
就是一条消息现次被重发了多次后(默认6次),将会被activemq移入"死信队列"。开发人员可以在这个Queue中查看处理茁错的消息,进行人工处理
1.How?(应用场景)
订单生成过程中发送消息到队列中,待物流、仓储系统进行下一步处理 , 此时消息的签收设置为手动签收(即业务成功处理进行签收)
如果物流系统和仓储系统处理消息的过程中出现错误,无法进行签收,此时根据重发机制,MQ会将消息进行重发,而如果此时后续的这些系统出现问题(崩溃、宕机),多次重发全部未成功,当到达重发次数(默认6次),将会将消息放入死信队列
? 进入死信队列的消息的处理方式
这些后续系统,仓储,物流等会有一个后台线程监控自身的运行状况 , 一旦系统恢复运作,便会调用抽取出死信队列中的消息进行重新处理
多个应用,不同的Destination使用不同的死信队列名称 ?
queuePrefix==代表死信队列的前前缀 最后的名字为DLQ.队列名
userQueueForTopicMessages 表示是否把Topic的DealLetter保存到Queue中,默认为true
(五)、防止重复调用 (网络延时传输导致消息重复调用问题的解决)
http://activemq.apache.org/redelivery-policy
网络延时传输中,会造成进行MQ重试。在重试过程中。可能会有重复消费的问题。
此时,根据消息的幂等性,非幂等性进行抉择 ? 类同于Dubbo远程调用的重试机制retries/faileover
幂等性操作 : 删、改、查
非幂等性操作 : 增 ? 给这个消息做一个唯一主键,那么就算出理重复消费的情况。就会有主键冲突,避免数据库出现脏数据。
如果不是这种情况,准备一个第三方服务来做消费记录。如Redis,给消息分配一个全局的ID,只要消费过该消息,把<id,Message>发K-V形式写入redis.那消费者开始消费前,先从云中查询有没有消费记录就可以了。