如何解决MQ消息消费顺序问题
通常mq可以保证先到队列的消息按照顺序分发给消费者消费来保证顺序,但是一个队列有多个消费者消费的时候,那将失去这个保证,因为这些消息被多个线程并发的消费。但是有的时候消息按照顺序处理是很重要的,那我们该如何来保证消息的顺序呢,下面将从activemq和rocketmq来看看,它们是如何来保证消息的顺序问题的?我们还可以有别的处理方案么?
Activemq处理方案
1、利用Activemq的高级特性:consumer之独有消费者(exclusive consumer)
在ActiveMQ4.x中可以采用Exclusive Consumer,broker会从queue中,一次发送消息给一个消费者,这样就避免了多个消费者并发消费的问题,从而保证顺序,配置如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true"); consumer = session.createConsumer(queue);
- 当在接收信息的时候有一个或者多个备份接收消息者和一个独占消息者的同时接收时候,无论两者创建先后,在接收的时候,均为独占消息者接收。
- 当在接收信息的时候,有多个独占消费者的时候,只有一个独占消费者可以接收到消息。
- 当有多个备份消息者和多个独占消费者的时候,当所有的独占消费者均close的时候,只有一个备份消费者接到到消息。
- 当主消费者挂了话,会立刻启用故障切换转移到下一台消费者继续消费
图1
独占消息就是在有多个消费者同时消费一个queue时,可以保证只有一个消费者可以消费消息,这样虽然保证了消息的顺序问题,不过也带来了一个问题,就是这个queue的所有消息将只会在这一个主消费者上消费,其他消费者将闲置,达不到负载均衡分配,而实际业务我们可能更多的是这样的场景,比如一个订单会发出一组顺序消息,我们只要求这一组消息是顺序消费的,而订单与订单之间又是可以并行消费的,不需要顺序,因为顺序也没有任何意义,有没有办法做到呢?答案是可以的,下面就来看看activemq的另一个高级特性之messageGroup。
2、利用Activemq的高级特性:messageGroups
Message Groups特性是一种负载均衡的机制。在一个消息被分发到consumer之前,broker首先检查消息JMSXGroupID属性。如果存在,那么broker会检查是否有某个consumer拥有这个message group。如果没有,那么broker会选择一个consumer,并将它关联到这个message group。此后,这个consumer会接收这个message group的所有消息,直到:
- Consumer被关闭
- Message group被关闭,通过发送一个消息,并设置这个消息的JMSXGroupSeq为-1
图2
配置如下:
bytesMessage.setStringProperty("JMSXGroupID", "constact-20100000002"); bytesMessage.setIntProperty("JMSXGroupSeq", -1);
如上图所示,同一个queue中,拥有相同JMSXGroupID的消息将发往同一个消费者,解决顺序问题,不同分组的消息又能被其他消费者并行消费,解决负载均衡的问题,两全其美啦!
- 问题讨论,除了上述Activemq为我们提供的方案,我们是否不依赖这两种特性,也能解决顺序问题呢?
Rocketmq处理方案
那rocketmq又是如何保证消息顺序消费的问题呢?
Rocketmq跟传统的MQ有一点区别,这里有必要讲一下topic的概念,Topic是RocketMQ中的一个重要概念,RocketMQ的各组件都是围绕着Topic建立起对应关系的,在RocketMQ官方文档和本文中, Topic在不同的语境下被赋予了两种不同的语义:
1)消息的Topic属性值:在描述Consumer的订阅设置信息或消息的属性时。
2)Topic属性为某个值的消息(单个消息或消息集合):在描述Broker,Producer和Consumer的对应关系,Queue以及负载均衡策略时。
topic和queue的对应关系是一个topic拥有多个queue,当producer往broken发送消息时,消息会存储在topic下的不同队列中,而一个队列只会被一个consumer消费,这样消息户被均衡负载到不同的队列下,也就是会被多个消费者并行消费,顺序就无法保证了。该怎么办呢?答案是把需要顺序消费的消息发送到同一台broker server下的同一个队列,而这些消息也只会被同一个消费者消费,这样就可以保证严格的顺序了,如下图:
1、消息要有顺序,首先得保证producer发送消息有顺序,如上图msg1,msg2,msg3发送到queue0是要有顺序的,只要producer等待前面的消息发送成功,在发送后面的消息就完全可以保证了,
2、假设msg1发送给consumer1,消费没有响应,该怎么办呢?是继续发送msg2还是重新发送msg1,一般为了保证消息一定被消费,肯定会选择重发msg1到下一台消费者consumer2。
3、消费端1没有响应Server时有两种情况,一种是msg1确实没有到达(数据在网络传送中丢失),另外一种消费端已经消费msg2且已经发送响应消息,只是MQ Server端没有收到。如果是第二种情况,重发msg1,就会造成msg1被重复消费。也就引入了消息重复问题,那就要幂等了。
Rocketmq同样做到了保证消息的顺序情况下,均衡消费的消费消息。
综上,我看到,在分布式系统中,要想消息有顺序的被消费,无论是Activemq还是Rocketmq都要想办法让有顺序的消息被同一消费者消费,而不是并发的消费,在消费者消费成功后,接着才会消费下一个消息,这样就可以保证了严格的顺序。