ActiveMQ中消费者是如何接收消息的(一)
事先说明,本博客关于ActiveMQ的文章都是基于ActiveMQ5.10版本。
初步用过ActiveMQ但又没去研究过源码的朋友肯定有些好奇ActiveMQ中消费者是如何接收消息的呢?本文我就和大家一起从源码角度来初步探讨消费者接收消息的过程。
我们知道,消息传送有两种模型:点对点(P2P)和发布订阅(PUB/SUB),队列模式中,消息生产者叫做发送者,消息消费者叫做接收者,而在发布订阅模式中,消息生产者叫发布者,消息消费者叫订阅者。点对点模型中队列(Queue)是消息发送和接收的途径和通道,他保证了一个消息最多只能被一个消费者消费,而发布订阅模型中,消息发送和接收的途径是主题(Topic),所有订阅主题的消费者,都可以接收到该主题发布的消息,所以在这个模型中,消息可以被多个消费者消费。
1)我们先来看看在点对点模型中消费者是如何接收消息的
如果直接使用过ActiveMQ API的朋友,一定知道消息接收者可以通过两种方式接收消息,一种是使用同步效果的MessageConsumer#receive() 和异步的使用消息监听器的MessageConsumer#setMessageListener(MessageListener listener) 。值得注意的是,在同一个org.apache.activemq.ActiveMQSession会话对象下面的消费者,如果有的是采用消息监听器接收消息,则那些采用同步receive() 接收消息的消费者会抛出 IllegalStateException("Cannot synchronously receive a message when a MessageListener is set")异常,也就是说,同一个Session下面,要么消费者都使用消息监听器,要么都使用receive() 同步接收。
这是为什么呢?我们先看下org.apache.activemq.ActiveMQMessageConsumer同步接收的源代码:
@Override public Message receive() throws JMSException { checkClosed(); checkMessageListener(); sendPullCommand(0); // 如果预取数为0,则主动向JMS服务器发送拉取消息的报文 MessageDispatch md = dequeue(-1); if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); // 给JMS服务器发送接收消息的应答报文 return createActiveMQMessage(md); // 取出消息副本并返回 }
上面的checkMessageListener()就是去做检查的,请看:
protected void checkMessageListener() throws JMSException { // 去调用所属会话的checkMessageListener();方法 session.checkMessageListener(); }
而ActiveMQSession中的源码如下:
public void checkMessageListener() throws JMSException { if (messageListener != null) { throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); } // 遍历由会话创建的消费者中是否有绑定消息监听器的消费者,如果有,则抛异常。 for (Iterator i = consumers.iterator(); i.hasNext();) { ActiveMQMessageConsumer consumer = i.next(); if (consumer.hasMessageListener()) { throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); } } }
如上所示,checkMessageListener() 调用的是该消费者所属会话的checkMessageListener()方法,而会话中的checkMessageListener()方法正是去该会话下面查看所有的消费者看看是否有采用消息监听的,如果有,则立马抛出IllegalStateException异常。至于ActiveMQ为什么要这样限制,第一是为了防止一个消费者同时采用同步和消息监听器两种方式接收消息,第二就是这样导致了无法采用一致的消息分发方式来将该会话接收到的消息合理的分配给下面的消费者,第三就是如果是事务性会话,采用两种方式的消费者是无法管理的。当然,如果你需要采用同步和异步消息接收共存,那也很简单,你只要通过ActiveMQConnection创建两个会话,一个会话下面创建的消费者都是采用同步接收,另一个会话下面创建的消费者都是采用异步接收就行了。
下面,我们来看看采用receive() 的内部是如何工作的。 这里,我们先来了解一下org.apache.activemq.ActiveMQMessageConsumer中几个重要的成员属性:
protected final MessageDispatchChannel unconsumedMessages;// 未消费的消息通道,里面用来储存未消费的消息,该通道容纳的最大消息数为预取值
protected final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();// 分发给该消费者但未应答的消息链表,列表中的消息顺序和被消费的顺序是相反的。
private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages; // 为了对TX的完整性进行验证,我们需要对一个事务中的消息重复发送进行跟踪。
这里,我们先给出receive()方法的源码:
@Override public Message receive() throws JMSException { checkClosed(); // 检查unconsumedMessages是否关闭 checkMessageListener(); // 检查是否有其他消费者使用了消息监听器 sendPullCommand(0); // 向JMS提供者发送一个拉取命令来拉取消息,为下次消费做准备 MessageDispatch md = dequeue(-1); // 从unconsumedMessages取出一个消息 if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); return createActiveMQMessage(md); }
在ActiveMQ中,通过会话创建一个消费者时,就会为这个消费者创建一个未消费的消息通道,该通道分为两种,如果你采用的是优先级队列,则创建的是SimplePriorityMessageDispatchChannel()简单优先级消息分发的通道,如果不是,则创建的是FifoMessageDispatchChannel()先进先出的分发通道,如果你要问为什么需要有这个东西,第一,消费者处理消息是需要时间的,如果每次处理完一条消息才告知Session我处理完了,你再给我一个,这对于快消费者来说,效率是极低的,所以你得允许Session能够一次性将多条消息分给一个消费者,还记得“预取consumer.prefetchSize”的特性吗?Session将某条消息发送到这个消费者时,会先把消息放入属于这个消费者的未消费的消息通道中,我们每调用一次消费者的receive() 方法,首先要做的是就是去检查这个通道是否被关闭,如果被关闭,则会抛出IllegalStateException("The Consumer is closed");异常,第二步才是去调用上面提到的方法去检查是否有采用消息监听器接收消息的其他消费者“哥们”,如果通过了这两项检查,接下来要做的就是异步向MOM发送一个pull命令消息来拉取消息(注意,只有在预取prefetchSize设置为0且未消费的消息通道unconsumedMessages中已经没消息了才会发送pull命令消息,因为只有这时才需要告诉JMS提供者,消费者我已经把消息处理完了,你得赶紧再给我发一批,当然这个命令的发送过程是异步的,这也是为什么采用receive接收消息可以设置预取为0的原因),在发送这个命令之前,客户端会先清理已分发消息链表deliveredMessages,这一步的处理分为两种,1.Session是非事务的,如果Session的应答模式是CLIENT_ACKNOWLEDGE,也就是需要客户端的消费者主动调用Message#acknowledge()来应答MOM,由于我们这里讨论的是队列,所以只是简单的将deliveredMessages给清空而已(如果是基于主题的,会去遍历deliveredMessages给每个消息调用ActiveMQConnection#rollbackDuplicate做重复回滚处理);如果Session应答模式不是CLIENT_ACKNOWLEDGE,则不管是队列还是主题,都只是清空deliveredMessages而已。2.Session是事务的,则会将遍历deliveredMessages中的消息放入previouslyDeliveredMessages中来为重发做准备,源码如下,false表示还未进行过重发。
for (MessageDispatch delivered : deliveredMessages) { previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); }
接着,消费者就会直接从unconsumedMessages取出一个消息,从上面的源码可以看出,传入的时间毫秒参数是-1,所以表示如果unconsumedMessages为空将一直阻塞,如果想设置超时时间,可以使用如下方法同步接收消息:
public Message receive(long timeout) throws JMSException;
timeout==0表示一点也不阻塞,直接返回,如果是大于零的值则最多阻塞设置的值的毫秒数。
阻塞取消息这一步走完,如果获得的消息分发对象MessageDispatch不为空,这如上面的源代码,将执行beforeMessageIsConsumed(md);方法,如该方法名所示,该方法主要做消费消息前的准备工作,如果应答模式不是DUPS_OK_ACKNOWLEDGE或者是队列模式,则将该消息分发对象放入deliveredMessages列表的开头;如果Session是事务的,则(这里呆会在补充)。接下来调用的afterMessageIsConsumed(md, false);的主要作用是应答MOM,所以,当这个方法执行完,你就可以通过MQ的控制台看到该消息已经在“Messages Dequeued”中了。最后的createActiveMQMessage(md);作用就更简单了,直接从md对象中取出消息的副本进行返回,这样,消息接收者客户端就完成了一条消息的同步接收。
接着,我们来看看采用消息监听器是如何接收消息的。 消费者可以调用public void setMessageListener(MessageListener listener) throws JMSException;方法来给自己设置一个消息监听器,下面给出源码:
@Override public void setMessageListener(MessageListener listener) throws JMSException { checkClosed(); if (info.getPrefetchSize() == 0) { throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); } if (listener != null) { boolean wasRunning = session.isRunning(); if (wasRunning) { session.stop(); } this.messageListener.set(listener); session.redispatch(this, unconsumedMessages); if (wasRunning) { session.start(); } } else { this.messageListener.set(null); } }
注意看加粗部分代码,可以看出,采用消息监听器接收消息的消费者,预取数必须大于0,JMS给出的说法是异步消费者不支持。我们来一行行分析代码,该方法首先的工作和采用同步接收消息的方法一样去检查unconsumedMessages是否关闭,如果没有关闭,且listener不为空,则看会话Session是否已经Running,在ActiveMQSession中,有一个叫started的AtomicBoolean,他在Session调用自己的启动方法start()方法时会设置成true,而session.isRunning()方法返回的正是此值,下面给出start()方法的源码:
protected void start() throws JMSException { started.set(true); for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { ActiveMQMessageConsumer c = iter.next(); c.start(); } executor.start(); }
可以看出,该方法不是公用的,因为默认是在ActiveMQSession构造函数中调用的:
if (connection.isStarted()) {
start();
}
有人会感到奇怪,我在通过ActiveMQConnection创建ActiveMQSession之前并没有调用ActiveMQConnection的start()方法啊,所以Session的构造函数里面也并没有启动Session自己啊?不用着急,因为你随后调用的ActiveMQConnection的start()方法里面也会去调用Session的start()方法,源码如下:
@Override public void start() throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); if (started.compareAndSet(false, true)) { for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { ActiveMQSession session = i.next(); session.start(); } } }
Connection在启动时会主动去遍历其下创建的Session,挨个让Session启动。经常使用JMSAPI的人应该知道,如果Connection没有调用start()方法时,即使队列中有消息,该Connection下面的消费者都是无法获取到该消息的(发消息不同,即使Connection没有启动,消息发送者仍然可以发送消息到JMS服务器),这下你们都知道原因了吧。好,回归正题,如果发现Session已经启动,它会主动去“关闭”该会话,这是当然的,ActiveMQ得保证该会话下面所有消费者都做好消息接收准备工作再启动自己。所以,如果我们直接使用ActiveMQ的API,最好是所有工作都做好后,再去调用ActiveMQConnection的start()方法。再保证了此时Session没有启动后,很显然我们得保存这个listener,因为我们后面还会去调用它。接着是session.redispatch(this, unconsumedMessages);,这是去消费该消费者unconsumedMessages中遗留的消息并将unconsumedMessages清空,因为我们是新创建的消费者,所以这一步就根本什么也没做。接着,如果Session是刚开始是启动的,由于刚才我们关闭过,所以我们会再次去启动它。这样,设置消息监听器的工作就作完了。