ActiveMQ与Spring线程池的整合

转自:http://www.g4studio.org/thread-880-1-1.html

ActiveMQ与Spring线程池的整合在企业级开发中,很多系统间的通信,特别是与外部系统间的通信,往往都是异步的,JMS便是J2EE应用程序中用于处理异步消息传递的接口。为了提高对外部消息的相应,服务器程序中往往利用线程技术来处理接收的消息,线程池的意义在于对这样的一个并行处理机制进行性能上的优化。为了迅速切入正体,这里就不多涉及JMS的内容与池的概念。仅对如何进行ActiveMQ与Spring线程池整合做较为详细的描述。 引用:

ActiveMQ与Spring线程池整合实例 

CSDN资源下载仅有47k

整合步骤

这里我按照配置流程逐点来描述进行配置的方法。其中MQ整合配置的过程中各bean之间的关系比较多,也比较晕,我用橙红色将他们标记出来,关注标记的几点是十分重要的。

让Spring支持ActiveMQ的配置语法

首先我们需要在applicationContext.xml中引入ActiveMQ的配置语法。

  1. <beans xmlns="http://www.springframework.org/schema/beans"  
  2. xmlns:amq="http://activemq.org/config/1.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd
  5.               http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">  
复制代码
  1. <beans xmlns="http://www.springframework.org/schema/beans"        xmlns:amq="http://activemq.org/config/1.0"        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd          http://activemq.org/config/1.0 http://people.apache.org/reposit ... ubator-SNAPSHOT.xsd">
复制代码

由于是SnapShot版本,那个XSD有部分错误,我们这里使用的是自行修改过的XSD,之后将activemq-core-4.1-incubator-SNAPSHOT.xsd文件放入META-INF文件夹下。并且新建spring的自定义scheam的配置文件spring.schemas:

  1. http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd=/activemq-core-4.1-incubator-SNAPSHOT.xsd  
复制代码

http\://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd=/activemq-core-4.1-incubator-SNAPSHOT.xsd这样在应用程序上下文中就可以解析ActiveMQ的配置语法了。

注:实例中是3个文件的配置方法,只不过这3个文件是由activemq-core-4.1-incubator-SNAPSHOT.xsd分解开来的,spring.schemas中也需要将这3个文件都配置进去。

配置ActiveMQ基础部件

为了使ActiveMQ能够正常运行起来,我们需要在ApplicationContext.xml中进行配置。

  1. //配置ActiveMQ Broker   
  2. <amq:brokeruseJmx="false"persistent="false">
  3. <amq:destinations>
  4. <amq:queueid="jms.log"physicalname="pams.amq"/>
  5. </amq:destinations>
  6. <amq:transportConnectors>
  7. <amq:transportConnectoruri="tcp://localhost:61616"/>
  8. </amq:transportConnectors>
  9. </amq:broker>
  10. //配置ConnectionFactory
  11. <amq:connectionFactoryid="jmsConnectionFactory"
  12. brokerURL="vm://localhost:61616"/>
  13. //配置Queue
  14.     <amq:queue name="destination" physicalname="pams.amq" />
复制代码

//配置ActiveMQBroker<amq:brokeruseJmx="false"persistent="false"><amq:destinations><amq:queueid="jms.log"physicalname="pams.amq"/></amq:destinations><amq:transportConnectors><amq:transportConnectoruri="tcp://localhost:61616"/></amq:transportConnectors></amq:broker>//配置ConnectionFactory<amq:connectionFactoryid="jmsConnectionFactory"brokerURL="vm://localhost:61616"/>//配置Queue<amq:queuename="destination"physicalname="pams.amq"/>在这里使用了内嵌JVM这种最简单的模式,这样在Spring初始化时ActiveMQ便加载了。

对于ActiveMQ的配置,我们还需要配置消息生产者、消息消费者和消息转换者等等部分。不过,在此之前我们需要先将线程池配置进来,之后再进行ActiveMQ余下的配置。

配置Spring线程池

对于线程池的配置,我们需要配置一个线程池执行器。首先看下配置代码:

  1. <!-- ThreadPool Executor -->   
  2. <beanid="threadPoolExecutor"class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  3. <!--<propertyname="threadFactory"ref="threadFactory"/>-->
  4. <propertyname="corePoolSize"value="2"/>
  5. <propertyname="maxPoolSize"value="4"/>
  6. <propertyname="queueCapacity"value="500"/>
  7. <propertyname="keepAliveSeconds"value="300"/>
  8. <FONTcolor=red>
  9. <propertyname="rejectedExecutionHandler">
  10. <beanclass="java.util.concurrent.ThreadPoolExecutor$DiscardOldestPolicy"/>
  11. </property>
  12. </FONT>
  13. </bean>  
复制代码

<!--ThreadPoolExecutor--><beanid="threadPoolExecutor"class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><!--<propertyname="threadFactory"ref="threadFactory"/>--><propertyname="corePoolSize"value="2"/><propertyname="maxPoolSize"value="4"/><propertyname="queueCapacity"value="500"/><propertyname="keepAliveSeconds"value="300"/><propertyname="rejectedExecutionHandler"><beanclass="java.util.concurrent.ThreadPoolExecutor$DiscardOldestPolicy"/></property></bean>这里同样仅做了一个简单的配置,需要注意的是对rejectedExecutionHandler属性的配置,这里使用了JDK1.5中用于被拒绝任务的处理机制,在JDK1.5中有4种处理机制,都可以在这里以这种方式配置使用。

消息转换器的配置

完成线程池执行器的配置后,我们需要配置一个消息转换器(Converter),这样在生产和消费时可以直接发送Order对象,而不是JMS的Message对象。

  1. //消息转换器   
  2.     <bean id="resourceMessageConverter" class="com.jmspool.jms.ResourceMessageConverter" />  
复制代码

//消息转换器<beanid="resourceMessageConverter"class="com.jmspool.jms.ResourceMessageConverter"/>

jmsTemplate的配置

之后我们需要配置一个由Spring提供的Template,其绑定ConnectionFactory与消息转换器(Converter)。这样在进行消息的生产时我们可以直接使用Spring提供的Template中的方法。

  1. <!-- JMS ActiveMQ Config-->   
  2. <beanid="jmsTemplate"
  3. class="org.springframework.jms.core.JmsTemplate">
  4. //连接工厂
  5. <propertyname="connectionFactory">
  6. <bean
  7. class="org.springframework.jms.connection.SingleConnectionFactory">
  8. <propertyname="targetConnectionFactory"
  9. ref="jmsConnectionFactory"/>
  10. </bean>
  11. </property>
  12. //消息转换器
  13. <propertyname="messageConverter"
  14. ref="resourceMessageConverter"/>
  15.         </bean>  
复制代码

<!--JMSActiveMQConfig--><beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">//连接工厂<propertyname="connectionFactory"><beanclass="org.springframework.jms.connection.SingleConnectionFactory"><propertyname="targetConnectionFactory"ref="jmsConnectionFactory"/></bean></property>//消息转换器<propertyname="messageConverter"ref="resourceMessageConverter"/></bean>

消息生产者

消息生产者用于生产消息,也就是将消息发送出去。在这里我们使用上面配置好的JmsTemplate来发送发送消息。这个类的代码我就不贴出来了,大家可以在实例中查看。配置消息生产者需要JmsTemplate与消息队列(Queue)。

  1. <bean id="resourceMessageProducer"  
  2. class="com.jmspool.jms.ResourceMessageProducer">
  3. <propertyname="template">
  4. <refbean="jmsTemplate"></ref>
  5. </property>
  6. <propertyname="destination">
  7. <refbean="destination"></ref>
  8. </property>
  9.         </bean>  
复制代码

<beanid="resourceMessageProducer"class="com.jmspool.jms.ResourceMessageProducer"><propertyname="template"><refbean="jmsTemplate"></ref></property><propertyname="destination"><refbean="destination"></ref></property></bean>

消息接收处理者

消息接收者(MDP)使用Spring的MessageListenerAdapter,指定负责处理消息的POJO及其方法名,绑定消息转换器Converter。

  1. <bean id="resourceMessageListener"  
  2. class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  3. <constructor-arg>
  4. <refbean="resourceMessageConsumer"></ref>
  5. </constructor-arg>
  6. <propertyname="defaultListenerMethod"value="addResource"/>
  7. <propertyname="messageConverter"ref="resourceMessageConverter"/>
  8.         </bean>   
复制代码

<beanid="resourceMessageListener"class="org.springframework.jms.listener.adapter.MessageListenerAdapter"><constructor-arg><refbean="resourceMessageConsumer"></ref></constructor-arg><propertyname="defaultListenerMethod"value="addResource"/><propertyname="messageConverter"ref="resourceMessageConverter"/></bean>

监听容器的配置

监听容器的任务是负责调度MDP, 绑定connectionFactory,Queue和MDP。更重要的一点,这里也是将线程执行器与ActiveMQ整合的切入点。先看下配置文件:

  1. <bean id="resourcelistenerContainer"    
  2. class="org.springframework.jms.listener.SimpleMessageListenerContainer">
  3. <propertyname="connectionFactory"ref="jmsConnectionFactory"/>
  4. <propertyname="autoStartup"value="true"/>
  5. <propertyname="concurrentConsumers"value="6"/>
  6. <propertyname="destination"ref="destination"/>
  7. <propertyname="messageListener"ref="resourceMessageListener"/>
  8. <propertyname="sessionTransacted"value="true"/>
  9. <FONTcolor=red><propertyname="taskExecutor"ref="threadPoolExecutor"/></FONT>
  10.         </bean>  
复制代码

<beanid="resourcelistenerContainer"class="org.springframework.jms.listener.SimpleMessageListenerContainer"><propertyname="connectionFactory"ref="jmsConnectionFactory"/><propertyname="autoStartup"value="true"/><propertyname="concurrentConsumers"value="6"/><propertyname="destination"ref="destination"/><propertyname="messageListener"ref="resourceMessageListener"/><propertyname="sessionTransacted"value="true"/><propertyname="taskExecutor"ref="threadPoolExecutor"/></bean>这里通过taskExecutor属性将已经配置好的线程执行器threadPoolExecutor与JMS(ActiveMQ)整合起来。

消息消费者的配置

消息消费者是消息的最终处理部分,开始文章里面说说的整合线程的目的,也就是为了使得处理者能够并行的处理接收到的消息。同样这里为了简洁也不将代码贴出来。配置时需要绑定线程池执行器。

  1. <bean id="resourceMessageConsumer"  
  2. class="com.jmspool.jms.ResourceMessageConsumer"abstract="false"
  3. lazy-init="default"autowire="default"dependency-check="default">
  4. <propertyname="threadPoolExecutor">
  5. <refbean="threadPoolExecutor"></ref>
  6. </property>
  7.         </bean>  
复制代码

<beanid="resourceMessageConsumer"class="com.jmspool.jms.ResourceMessageConsumer"abstract="false"lazy-init="default"autowire="default"dependency-check="default"><propertyname="threadPoolExecutor"><refbean="threadPoolExecutor"></ref></property></bean>至此,我们的整合工作就完成了。在上述文中仅仅简单的描述了配置的步骤,对于原理及各类的属性意义并没有详细的描述。因为网上有不少这样的文章。

写在最后:

我们配置的所有步骤其实都是围绕着Spring展开的,虽然我们完成的是一个ActiveMQ与Spring线程池的整合实例,但是如果换做其他MQ中间件,此法同样是适用的。因为我们整合的层面是在Spring上。

相关推荐