使用Spring和ActiveMQ实现高效的轻量级的JMS

转载自:http://windows9834.blog.163.com/blog/static/27345004201311841121335/

这是官方推荐要看的原文的翻译:http://codedependents.com/2009/10/16/efficient-lightweight-jms-with-spring-and-activemq/

异步性(asynchronicity),是高伸缩性系统的首要的 设计 原则,对Java来说,这意味着JMS,然后又意味着ActiveMQ。但是我如何高效的使用JMS呢?一个人很有可能会不知所措,在谈论容器、框架、和一堆选项时。那么让我们分开来探讨下。

框架

ActiveMQ文档中提及两个框架:CamelSpring。那么要做决定就取决于简单性和功能性的对比。Camel支持大量的企业集成模式(Enterprise Integration Patterns),它可以大大简化集成组件间的大量服务和复杂的消息流。如果你的系统需要这样的功能,它当然是非常好的选择。然而,如果你寻找简单性,仅仅支持基本的最佳实践,那么Spring是好的选择。

JCA (使用它或放弃它)

阅读ActiveMQ的Spring支持时,开发者立刻被介绍了JCA容器和ActiveMQ很多的代理和适配器选项的概念。然而这些都不适应。JCA是EJB规范的一部分,对于大部分EJB规范,Spring都没有提供很好的支持。这时,就提到了Jencks,”为Spring提供的轻量级JCA容器”,它可以作为ActiveMQ的JCA容器。 乍看上去,这是个非常好的解决方案,但是我在这里告诉你不要这样做。Jencks最后更新时间是2007年1月。那时ActiveMQ的版本是 4.1x,Spring的版本是2.0.x,但是时间过了很多,事情也变化了很大。甚至是试图从Maven库中获取ActiveMQ4.1的依赖包 Jencks都会失败,因为它已经不存在了。简单的事实是有更好和更简单的方式来缓存资源。

发送消息

Spring消息发送的核心架构是JmsTemplate。在传统的Spring模板方式中,JmsTemplate隔离了像打开、关闭Session和Producer的繁琐操作,因此应用开发人员仅仅需要关注实际的业务逻辑。然而ActiveMQ快速的指出了JmsTemplate gotchas,大多是因为JmsTemplate被设计在每次调用时都打开和关闭session及producer。JmsTemplate损害了ActiveMQ的PooledConnectionFactory对session和消息producer的缓存机制而带来的性能提升。然而,这个太过时了。从Spring2.5.3开始,它开始提供自己的CachingConnectionFactory,我相信该类在缓存方面更加强大。然而,这里有一个点需要注意。默认情况下,CachingConnectionFactory只缓存一个session,在它的JavaDoc中,它声明对于低并发情况下这是足够的。与之相反,PooledConnectionFactory的默认值是500。这些设置,在很多情况下,需要亲自去测试并验证。我将其设置为100,对我来说还是很不错。

接收消息

可能你已经注意了,JmsTemplate gotchas强 烈的不建议你使用JmsTemplate的receive()调用,同样也是因为没有session和consumer的池机制。更多的原因是在 JmsTemplate上的所有调用都是同步的,这意味着调用线程需要被阻塞,直到方法返回。这在使用JmsTemplate发送消息时,没有任何问题, 因为该方法几乎是立即返回。然而,当调用receive方法时,线程将阻塞,直到接收到一个消息为止,这对性能影响很大。不幸的是,JmsTemplate gotchasSpring支持文档中都没有对该问题提供简单的解决方案。实际上,它们均提倡使用Jencks,我们之前已经说过。实际的解决方案,使用DefaultMessageListenerContainer,已经写在了how do I use JMS efficiently文 档中。DefaultMessageListenerContainer允许异步接收消息并缓存session和消息consumer。更有趣的 是,DefaultMessageListenerContainer可以根据消息数量动态的增加或缩减监听器的数量。简而言之,我们可以完全忽视 JCA。

合并起来

Spring Context配置文件

<? xml version ="1.0" encoding ="UTF-8"?>
<beansxmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd">
<!-- enables annotation based configuration -->
<context:annotation-config/>
<!-- scans for annotated classes in the com.company package -->
<context:component-scanbase-package="net.javasight"/>
<!--
allows for ${} replacement in the spring xml configuration from the
system.properties file on the classpath
-->
<context:property-placeholderlocation="classpath:system.properties"/>
<!-- creates an activemq connection factory using the amq namespace -->
<amq:connectionFactoryid="amqConnectionFactory"
brokerURL="${jms.url}"username="${jms.username}"password="${jms.password}"/>
<!--
CachingConnectionFactory Definition, sessionCacheSize property is the
number of sessions to cache
-->
<beanid="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-argref="amqConnectionFactory"/>
<propertyname="exceptionListener"ref="jmsExceptionListener"/>
<propertyname="sessionCacheSize"value="100"/>
</bean>
<!-- JmsTemplate Definition -->
<beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
<constructor-argref="connectionFactory"/>
</bean>
<!--
listener container definition using the jms namespace, concurrency is
the max number of concurrent listeners that can be started
-->
<jms:listener-containerconcurrency="10">
<jms:listenerid="QueueListener"destination="Queue.Name"
ref="queueListener"/>
</jms:listener-container>
</beans>

这里有两点需要注意。首先,我添加了amq和jms的命名空间。第二,我使用了Spring2.5的基于注解的配置。通过使用基于注解的配置,我可以简单的添加@Component注解到我的Java类上,而不是使用XML文件显式的在spring配置文件中定义它们。另外,我可以在我的构造方法中使用@Autowired来将像JmsTemplate这样的对象注入到我的对象中。

QueueSender

package net.javasight;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
publicclassQueueSender{
privatefinalJmsTemplate jmsTemplate;
@Autowired
publicQueueSender(finalJmsTemplate jmsTemplate){
this.jmsTemplate = jmsTemplate;
}

publicvoid send(finalString message){
                jmsTemplate.convertAndSend("Queue.Name", message);
}
}

Queue Listener

package net.javasight;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;

@Component
publicclassQueueListenerimplementsMessageListener{
publicvoid onMessage(finalMessage message){
if(message instanceofTextMessage){
finalTextMessage textMessage =(TextMessage) message;
try{
System.out.println(textMessage.getText());
}catch(finalJMSException e){
e.printStackTrace();
}
}
}
}

JmsExceptionListener

package net.javasight;

import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import org.springframework.stereotype.Component;

@Component
publicclassJmsExceptionListenerimplementsExceptionListener{

publicvoid onException(finalJMSException e){
e.printStackTrace();
}
}

相关推荐