activemq和spring结合使用
activemq上一次已经讲解了安装、启动、数据库的持久化配置等。
这次主要记录下,如何跟spring结合使用,如何发送消息以及进行消费。
消息产生者向JMS发送消息的步骤
(1)创建连接使用的工厂类JMSConnectionFactory
(2)使用管理对象JMSConnectionFactory建立连接Connection
(3)使用连接Connection建立会话Session
(4)使用会话Session和管理对象Destination创建消息生产者MessageSender
(5)使用消息生产者MessageSender发送消息消息消费者从JMS接受消息的步骤
(1)创建连接使用的工厂类JMSConnectionFactory
(2)使用管理对象JMSConnectionFactory建立连接Connection
(3)使用连接Connection建立会话Session
(4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver
(5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver
1.下面是使用了JTA配置、以及mq进行数据库持久化的spring相关配置。
<!--建立mqfactory--> <bean id="xaMQFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <!--建立connectionfactory,用jta的事务进行管理--> <bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close"> <property name="uniqueResourceName"> <value>QUEUE_BROKER</value> </property> <property name="xaConnectionFactory"> <ref bean="xaMQFactory"/> </property> </bean> <!--建立发送队列,以queuem模式--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="solrMessageQueue"/> </bean> <!--jdbcTemplate 用于发送消息,由于是使用数据库持久化--> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource"> <ref bean="dataSource"/> </property> </bean> <!--数据库连接信息,使用了Atomikos的JTA事务机制--> <bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close"> <property name="uniqueResourceName"> <value>ZZGRID_MAIN_JDBC_RESOURCE</value> </property> <property name="xaDataSourceClassName" value="oracle.jdbc.xa.client.OracleXADataSource" /> <property name="xaProperties"> <props> <prop key="user">username</prop> <prop key="password">password</prop> <prop key="URL" >jdbc:oracle:thin:@localhost:1521:oracle</prop> </props> </property> </bean>
2.不使用JTA,只进行数据库持久化的Spring配置
<!--建立connectionfactory--> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!--建立发送队列,以queue模式--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="solrMessageQueue"/> </bean> <!--jdbcTemplate 用于发送消息,由于是使用数据库持久化--> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource"> <ref bean="dataSource"/> </property> </bean> <!--数据库连接信息--> <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close"> <property name="driverClass" value="oracle.jdbc.driver.OracleDriver" /> <property name="jdbcUrl" value="jdbc:oracle:thin:@localhost:1521:oracle"/> <property name="user" value="username" /> <property name="password" value="password" /> <property name="acquireIncrement" value="5" /> <property name="acquireRetryAttempts" value="30" /> <property name="acquireRetryDelay" value="1000" /> <property name="idleConnectionTestPeriod" value="60" /> <property name="testConnectionOnCheckin" value="true" /> <property name="automaticTestTable" value="C3P0Table" /> </bean>
3.使用MQ默认的硬盘读写,结合spring的配置
<!--建立connectionfactory--> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!--建立发送队列,以queue模式--><bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="solrMessageQueue"/> </bean> <!--建立发送队列,以queue模式--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="solrMessageQueue"/> </bean> <!--jdbcTemplate 用于发送消息--> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean>
MQ消息体(body)――JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。这里演示相对最复杂的 ObjectMessage。
消息发送主体
package xxxx; import javax.jms.ConnectionFactory; import javax.jms.Destination; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; import com.xxx.jms.SolrMessage; import com.xxx.jms.SolrMessageConvert; @Component("activeMQMessageSender") public class ActiveMQMessageSender implements ApplicationContextAware{ private ApplicationContext appContext; @Override public void setApplicationContext(ApplicationContext appContext) throws BeansException { this.appContext=appContext; } public void send(SolrMessage solrMessage) { String isSenderSolrMsg = "true"; if (isSenderSolrMsg != null && Boolean.valueOf(isSenderSolrMsg)){ JmsTemplate template = new JmsTemplate(); template.setConnectionFactory((ConnectionFactory) appContext.getBean("connectionFactory")); template.setDefaultDestination((Destination) appContext.getBean("queueDestination")); template.setSessionTransacted(true);//如果不使用事务,这里修改成false template.setMessageConverter(new SolrMessageConvert()); template.convertAndSend(solrMessage); } }}
消息对象
package com.xxx.jms; public class SolrMessage { private Long id; private String type; private String mode; private String idCardNo; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getMode() { return mode; } public void setMode(String mode) { this.mode = mode; } public String getIdCardNo() { return idCardNo; } public void setIdCardNo(String idCardNo) { this.idCardNo = idCardNo; } public SolrMessage(String idCardNo, String type) { this.idCardNo = idCardNo; this.mode = Mode.DELETE.toString(); this.type = type; } public SolrMessage(Long id, String type, String mode) { this.id = id; this.type = type; this.mode = mode; } public SolrMessage() { }}
msg消息和send消息的转化类
package com.xxx.jms; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.support.converter.MessageConversionException; import org.springframework.jms.support.converter.MessageConverter; public class SolrMessageConvert implements MessageConverter { @Override public Object fromMessage(Message message) throws JMSException, MessageConversionException { SolrMessage solrMessage = new SolrMessage(); if (message != null) { MapMessage msg = (MapMessage) message; solrMessage.setId(msg.getLong("id")); solrMessage.setIdCardNo(msg.getString("idCardNo")); solrMessage.setMode(msg.getString("mode")); solrMessage.setType(msg.getString("type")); } return solrMessage; } @Override public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException { MapMessage mapMessage = session.createMapMessage(); if (null != object) { SolrMessage solrMessage = (SolrMessage) object; if (null != solrMessage.getId()) { mapMessage.setLong("id", solrMessage.getId()); } if (null != solrMessage.getType()) { mapMessage.setString("type", solrMessage.getType()); } if (null != solrMessage.getMode()) { mapMessage.setString("mode", solrMessage.getMode()); } if (null != solrMessage.getIdCardNo()) { mapMessage.setString("idCardNo", solrMessage.getIdCardNo()); } } return mapMessage; }}
mq消息的消费也是类似的,主要涉及的spring配置如下
<!--消息转化类,用于send消息和mqmsg消息的转化--> <bean id="messageConvert" class="com.xxx.SolrMessageConvert" /> <!--消息的消费--> <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg> <bean class="com.xxx.SolrRegisterListener"/> </constructor-arg> <property name="defaultListenerMethod" value="onRegister"/> <property name="messageConverter" ref="messageConvert"/> </bean> <!--接收mq消息的监听器,用于检测mq消息的消费--> <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="messageListener"/> </bean>
消息消费类(接收类)
package com.xxx; import org.springframework.beans.factory.annotation.Autowired; import com.xxx.solr.domain.SolrMessage; public class SolrRegisterListener { public void onRegister(SolrMessage solrMessage){ System.out.println(solrMessage.getId() + " " + solrMessage.getMode() + " " + solrMessage.getType() + " " + solrMessage.getIdCardNo()); /**具体的业务逻辑 **对接收到的消息进行消费 **/ }}
这个编辑器有些不好用,预览模式和编辑模式以及正式的页面效果都不完全一致。。。。