ActiveMq 的高级特性

ActiveMQ高级特性

异步发送

消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send()方法会被阻塞,直到broker发送一个确认消息给生产者,这个确认消息暗示生产者broker已经成功地将它发送的消息路由到目标目的并把消息保存到二级存储中。这个过程通常称为同步发送。但有一个例外,当发送方法在一个事物上下文中时,被阻塞的是commit方法而不是send方法。commit方法成功返回意味着所有的持久消息都以被写到二级存储中。

同步发送持久消息能够提供更好的可靠性,但这潜在地影响了程序的相应速度,因为在接受到broker的确认消息之前应用程序或线程会被阻塞。如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到broker的确认之前一直阻塞Producer.send方法。如果想启动异步传送可以把connectoruri的jms.useAsyncSend选项设为true,如下所示:

tcp://localhost:61616?jms.useAsyncSend=true

从ActiveMQ5开始可以控制异步发送流。也就是说,在受到broker的确认应答之前,生产者能够传送消息给broker的最大信息量。即使是异步发送消息,生产者也是在收到broker的确认应答后才把下一条消息传送给broker。当使用异步传送的时候,可以设置jms.producerWindowSize(单位为字节)属性,当生产者中等待发送的信息量到达设置的值时,即使没有收到broker的应答消息,生产者同样会把这些消息发给broker。如下面的示例设置:

tcp://localhost:61616?jms.useAsyncSend=true&jms.producerWindowSize=1024000

单独确认

在ActiveMQ5.2中添加了一个新的确认模式,这种确认模式是特定于ActiveMQ的,jms规范暂时并不支持这种确认模式。这种确认模式由ora.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE表示,用来确认一个单独的消息。这中确认模式是相对于Session.CLIENT_ACKNOWLEDGE的,在CLIENT_ACKNOWLEDGE模式下,调用消息的acknowledge()方法会确认由此session消费的所有消息,而在INDIVIDUAL_ACKNOWLEDGE模式下,仅会确认调用acknowledge()方法的消息。

企业集成模式

通过ApacheCamel,ActiveMQ支持《EIP》一书中提到的企业集成模式。参见http://activemq.apache.org/enterprise-integration-patterns.html。

消息游标

在ActiveMQ的之前版本中,broker会把正在传输的消息保存在内存中。使用这种内存模型,当一个消费者消费消息的速度跟不上生产者生产消息的速度的时候,会是broker内存中维护的正在传输的消息数量迅速增长,最终到达最大限额。当到达此最大限额后,broker就不能接受来自客户端的消息,这样生产者就会被阻塞直到broker的内存中有保存消息的空间为止。

从5.0版本开始,ActiveMQ实现了一种新的内存模型以防止慢速的消费者阻塞运行速度更快的生产者。这种内存模型使用了消息游标,详情请查看http://activemq.apache.org/message-cursors.html。

对spring的支持

请查看ActiveMQ的springsupport页面,查看如何在spring中配置ActiveMQ的jms客户端。

连接池

org.apache.activemq.pool包中提供了一个服务提供者对象PooledConnectionFactory,通过这个类应用程序可以缓存Connection、Session以及MessageProducer。更详细的信息可以查看PooledConnectionFactory的文档。

PooledConnectionFactory主要用来与其它一些框架或工具集成使用,例如spring。

消息转换器

从ActiveMQ5开始,ActiveMQ允许开发人员把他们的消息转换对象添加到ActiveMQ的消息总线上。一个ActiveMQ转换对象(或者是转换器)必须实现org.apache.activemq.MessageTransformaer接口。

如果要设置一个消息转换器,可以调用一下对象的setTransformaer()方法设置一个消息转换器:ActiveMQConnectionFactory、ActiveMQConnection、ActiveMQSession、ActiveMQMessageConsumer和ActiveMQMessageProducer。消息转化器是可继承的,也就是说当设置了ActiveMQConenctionFactory的消息转化器后,由此ActiveMQConnectionFacttory创建的连接、会话、消费者和生产者都会继承ActiveMQConnectionFactory的消息转换器。

组合目的

组合目的是ActiveMQ特有的一个特性,jms规范并不支持。组合目的是指多个物理目的被映射到一个虚目的,当生产者向虚目的发送一条消息时,这条消息会被转发到映射到此虚目的地物理目的上。组合目的分为客户端组合目的以及消息代理端组合目的。

客户端组合目的

在客户端可以通过jndi给一个虚目的配置多个物理目的。如下面的jndi.properties示例文件所示,生产者向目的Q.BLAST发送一条消息后,这条消息会被转发到物理目的Q.REQ、Q.FOO和Q.TEST。

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory

connectionFactoryNames=local

connection.local.brokerURL=tcp://localhost:61616

queue.Q.BLAST=Q.REQ,Q.FOO,Q.TEST

在客户端组合目的中也可以混合使用主题和队列。如果一个虚目的映射的物理目的中既有队列又有主题,那么必须使用queue://或topic://前缀。如下所示,在上面示例的基础上为虚目的Q.BLAST添加了一个主题。

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory

connectionFactoryNames=local

connection.local.brokerURL=tcp://localhost:61616

queue.Q.BLAST=Q.REQ,Q.FOO,Q.TEST,topic://TOPIC.TEST

消息代理端组合目的

在broker的xml配置文件中使用元素<destinationInterceptors>可以配置组合目的。如下示例配置片段所示:

<brokerbrokername="mybroker"xmlns="http://activemq.org/config/1.0">

....

<desinationInterceptors>

<virualDestinationInterceptors>

<virtualDesinations>

<compositeQueuename="Q.BLAST">

<forwardTo>

<queuephysicalname="Q.REQ"/>

<queuephysicalname="Q.FOO"/>

<queuephysicalname="Q.TEST"/>

<topicphysicalname="TOPIC.TEST"/>

</forwardTo>

</compositeQueue>

</virtualDestinations>

</virtualDestinationInterceptors>

</destinationInterceptors>

</broker>

镜像队列

镜像队列特性允许应用程序监控通过队列的消息流。如果启用镜像队列,那么发送到某个队列的消息会被发布到一个主题中,因此对通过队列传递的消息感兴趣的应用程序就可以订阅相应的主题。例如,应用程序中有生产者向队列Q.TEST发送消息,同样有一些消费者接收此队列中的消息。假设现在希望能够监控通过这个主题的消息,这可以通过镜像队列来实现,监控程序则可以监听主题VirtualTopic.Mirror.Q.TEST,这样就能接收到发送到主题Q.TEST上的所有消息。

默认情况下没有启用镜像队列功能,如果希望启用这个特性可以设置brokerxml配置文件中<broker>元素的属性useMirroredQueues为true。

相关推荐