activemq和spring结合使用

activemq上一次已经讲解了安装、启动、数据库的持久化配置等。

这次主要记录下,如何跟spring结合使用,如何发送消息以及进行消费。

消息产生者向JMS发送消息的步骤

(1)创建连接使用的工厂类JMSConnectionFactory

(2)使用管理对象JMSConnectionFactory建立连接Connection

(3)使用连接Connection建立会话Session

(4)使用会话Session和管理对象Destination创建消息生产者MessageSender

(5)使用消息生产者MessageSender发送消息

消息消费者从JMS接受消息的步骤

(1)创建连接使用的工厂类JMSConnectionFactory

(2)使用管理对象JMSConnectionFactory建立连接Connection

(3)使用连接Connection建立会话Session

(4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver

(5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver

 

 1.下面是使用了JTA配置、以及mq进行数据库持久化的spring相关配置。

 <!--建立mqfactory-->
<bean id="xaMQFactory"
        class="org.apache.activemq.ActiveMQXAConnectionFactory">
        	<property name="brokerURL" value="tcp://localhost:61616" />
    </bean>
<!--建立connectionfactory,用jta的事务进行管理-->
<bean id="connectionFactory" 
          class="com.atomikos.jms.AtomikosConnectionFactoryBean" 
          init-method="init" destroy-method="close">
        <property name="uniqueResourceName">
            <value>QUEUE_BROKER</value>
        </property>
        <property name="xaConnectionFactory">
            <ref bean="xaMQFactory"/>
        </property>
 </bean>
<!--建立发送队列,以queuem模式-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> 
            <constructor-arg index="0" value="solrMessageQueue"/> 
 </bean>	
<!--jdbcTemplate 用于发送消息,由于是使用数据库持久化-->

 <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource">
            <ref bean="dataSource"/>
       </property>
    </bean>

<!--数据库连接信息,使用了Atomikos的JTA事务机制-->
<bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
        <property name="uniqueResourceName">
            <value>ZZGRID_MAIN_JDBC_RESOURCE</value>
        </property>
  <property name="xaDataSourceClassName" value="oracle.jdbc.xa.client.OracleXADataSource" />
        <property name="xaProperties">
            <props>
                <prop key="user">username</prop>
                <prop key="password">password</prop>
                <prop key="URL" >jdbc:oracle:thin:@localhost:1521:oracle</prop>
            </props>
        </property>  
 </bean>

2.不使用JTA,只进行数据库持久化的Spring配置

<!--建立connectionfactory-->
 <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> 
                <property name="brokerURL" value="tcp://localhost:61616"/> 
  </bean> 

<!--建立发送队列,以queue模式-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> 
            <constructor-arg index="0" value="solrMessageQueue"/> 
 </bean>	
<!--jdbcTemplate 用于发送消息,由于是使用数据库持久化--> 
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">        
         <property name="dataSource">            
	<ref bean="dataSource"/>      
         </property>   
 </bean>
<!--数据库连接信息--> 
 <bean id="dataSource"
  class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close">
  <property name="driverClass"
   value="oracle.jdbc.driver.OracleDriver" />
  <property name="jdbcUrl" value="jdbc:oracle:thin:@localhost:1521:oracle"/>
  <property name="user" value="username" />
  <property name="password" value="password" />
  <property name="acquireIncrement" value="5" />
  <property name="acquireRetryAttempts" value="30" />
  <property name="acquireRetryDelay" value="1000" />
  <property name="idleConnectionTestPeriod" value="60" />
  <property name="testConnectionOnCheckin" value="true" />
  <property name="automaticTestTable" value="C3P0Table" /> 
 </bean>

3.使用MQ默认的硬盘读写,结合spring的配置

<!--建立connectionfactory-->
 <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">                 <property name="brokerURL" value="tcp://localhost:61616"/>   </bean> <!--建立发送队列,以queue模式--><bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> 
            <constructor-arg index="0" value="solrMessageQueue"/> 
 </bean>	

<!--建立发送队列,以queue模式-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> 
            <constructor-arg index="0" value="solrMessageQueue"/> 
 </bean>	


<!--jdbcTemplate 用于发送消息--> 
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">                 
 <property name="connectionFactory" ref="connectionFactory"/>     
</bean>

 MQ消息体(body)――JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。这里演示相对最复杂的 ObjectMessage。

消息发送主体

package xxxx;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import com.xxx.jms.SolrMessage;
import com.xxx.jms.SolrMessageConvert;
@Component("activeMQMessageSender")
public class ActiveMQMessageSender implements ApplicationContextAware{
	
	private ApplicationContext appContext;

	@Override
	public void setApplicationContext(ApplicationContext appContext)
			throws BeansException {
		this.appContext=appContext;
	}
	
	public void send(SolrMessage solrMessage) {
		String isSenderSolrMsg = "true";
		if (isSenderSolrMsg != null && Boolean.valueOf(isSenderSolrMsg)){
			JmsTemplate template = new JmsTemplate();
			template.setConnectionFactory((ConnectionFactory) appContext.getBean("connectionFactory"));
			template.setDefaultDestination((Destination) appContext.getBean("queueDestination"));
			template.setSessionTransacted(true);//如果不使用事务,这里修改成false
			template.setMessageConverter(new SolrMessageConvert());
			template.convertAndSend(solrMessage);
		}
	}}

消息对象 

package com.xxx.jms;

public class SolrMessage {
	private Long id;
	private String type;
	private String mode;
	private String idCardNo;

	public Long getId() {
		return id;
	}

	public void setId(Long id) {
		this.id = id;
	}

	public String getType() {
		return type;
	}

	public void setType(String type) {
		this.type = type;
	}

	public String getMode() {
		return mode;
	}

	public void setMode(String mode) {
		this.mode = mode;
	}

	public String getIdCardNo() {
		return idCardNo;
	}

	public void setIdCardNo(String idCardNo) {
		this.idCardNo = idCardNo;
	}

	public SolrMessage(String idCardNo, String type) {
		this.idCardNo = idCardNo;
		this.mode = Mode.DELETE.toString();
		this.type = type;
	}

	public SolrMessage(Long id, String type, String mode) {
		this.id = id;
		this.type = type;
		this.mode = mode;
	}

	public SolrMessage() {

	}}

 msg消息和send消息的转化类

package com.xxx.jms;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;

public class SolrMessageConvert implements MessageConverter {

	@Override
	public Object fromMessage(Message message) throws JMSException,
			MessageConversionException {
		SolrMessage solrMessage = new SolrMessage();
		if (message != null) {
			MapMessage msg = (MapMessage) message;
			solrMessage.setId(msg.getLong("id"));
			solrMessage.setIdCardNo(msg.getString("idCardNo"));
			solrMessage.setMode(msg.getString("mode"));
			solrMessage.setType(msg.getString("type"));
		}

		return solrMessage;
	}

	@Override
	public Message toMessage(Object object, Session session)
			throws JMSException, MessageConversionException {
		MapMessage mapMessage = session.createMapMessage();
		if (null != object) {
			SolrMessage solrMessage = (SolrMessage) object;

			if (null != solrMessage.getId()) {
				mapMessage.setLong("id", solrMessage.getId());
			}
			if (null != solrMessage.getType()) {
				mapMessage.setString("type", solrMessage.getType());
			}
			if (null != solrMessage.getMode()) {
				mapMessage.setString("mode", solrMessage.getMode());
			}
			if (null != solrMessage.getIdCardNo()) {
				mapMessage.setString("idCardNo", solrMessage.getIdCardNo());
			}
		}

		return mapMessage;
	}}

mq消息的消费也是类似的,主要涉及的spring配置如下

<!--消息转化类,用于send消息和mqmsg消息的转化-->
<bean id="messageConvert" class="com.xxx.SolrMessageConvert" />
  <!--消息的消费-->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
            <constructor-arg>
            <bean class="com.xxx.SolrRegisterListener"/>
            </constructor-arg>
            <property name="defaultListenerMethod" value="onRegister"/>
            <property name="messageConverter" ref="messageConvert"/>
    </bean>
   <!--接收mq消息的监听器,用于检测mq消息的消费--> 
    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="queueDestination"/>
            <property name="messageListener" ref="messageListener"/>
    </bean>

消息消费类(接收类)

package com.xxx;

import org.springframework.beans.factory.annotation.Autowired;
import com.xxx.solr.domain.SolrMessage;


public class SolrRegisterListener {
		
	public void onRegister(SolrMessage solrMessage){
		System.out.println(solrMessage.getId() + "	"
				+ solrMessage.getMode() + "	"
				+ solrMessage.getType() + "	"
				+ solrMessage.getIdCardNo());
	/**具体的业务逻辑
 	**对接收到的消息进行消费
	**/
}}

这个编辑器有些不好用,预览模式和编辑模式以及正式的页面效果都不完全一致。。。。

相关推荐