activeMQ完整的demo,值得你拥有

               

                      最近项目里面要求实时的分析数据,唉,storm学习成本太高,所以就想到了activeMQ. Apache ActiveMQ是最流行的功能强大的开源即时通讯和集成模式的服务器。Apache ActiveMQ的速度快,支持多语言和跨客户协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能。废话不多说,直接上demo了。

===============================     action       start  =========================

 一:必备jar包

<dependency>

    <groupId>org.apache.activemq</groupId>

    <artifactId>activemq-all</artifactId>

    <version>5.9.0</version>

</dependency>

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-pool</artifactId>

<version>5.9.0</version>

</dependency>

二:服务端

            服务端包含4部分:

                    a.消息转换器

                    b.消息发布者

                    c.消息发送

                    d.activemq.xml配置

 a.消息转换器

package cn.innosoft.jt809.activemq.converter;

import java.math.BigDecimal;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.springframework.jms.support.converter.MessageConversionException;

import org.springframework.jms.support.converter.MessageConverter;

import cn.innosoft.jt809.biz.dynamic.model.TGnssGpsHis;

/**

 * jms消息转换器

 * @author gaoq

 * @date 2015-3-27 下午2:57:23

 */

public class MsgConverter implements MessageConverter{

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

@Override

public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {

 if (!(object instanceof GpsVehicleReceive)) {  

           throw new MessageConversionException("obj is not MsgPojo");  

       }  

  GpsVehicleReceive msgPojo = (GpsVehicleReceive) object;  

       TextMessage textMessage = session.createTextMessage();  

       String msg = getGpsMsgString(msgPojo);

       textMessage.setText(msg);  

       return  textMessage;  

}

@Override

public Object fromMessage(Message message) throws JMSException, MessageConversionException {

if (!(message instanceof TextMessage)) {  

           throw new MessageConversionException("Message is not TextMessage");  

       }  

       TextMessage textMessage = (TextMessage) message;  

       TGnssGpsHis msg = new TGnssGpsHis();  

       String[] texts=textMessage.getText().split(",");  

         msg.setX(1);

} catch (ParseException e) {

e.printStackTrace();

}

       return msg;  

}

/**

* 获取对象转换后的字符串.

* @param msgPojo TGnssGpsHis

* @return String

*/

private String getGpsMsgString(GpsVehicleReceive msgPojo) {

String msg = msgPojo.getX()+","+msgPojo.getY();

return msg;

}

}

b.消息发布者

package cn.innosoft.jt809.activemq.service;

import javax.jms.Destination;

import org.springframework.jms.core.JmsTemplate;

import cn.innosoft.jt809.activemq.converter.GpsVehicleReceive;

/**

 * 消息发布者.

 * @author gaoq

 * @date 2015-3-19 上午11:12:00

 */

public class TopicPublisherService {

JmsTemplate jmsTemplate;

Destination destination;

//  public void send(final String msg) {  

//         MessageCreator messageCreator = new MessageCreator() {  

//             public Message createMessage(Session session) throws JMSException {  

//                 TextMessage message = session.createTextMessage();  

//                 message.setText(msg);  

//                 return message;  

//             }  

//         };  

//         jmsTemplate.send(this.destination, messageCreator);

//     }   

public void convertAndSend(GpsVehicleReceive msgPojo){  

        jmsTemplate.convertAndSend(this.destination, msgPojo);  

    }  

public void setJmsTemplate(JmsTemplate jmsTemplate) {

this.jmsTemplate = jmsTemplate;

}

public void setDestination(Destination destination) {

this.destination = destination;

}

}

 c.消息发送(在应用类中,获取数据并发送)

private static void sendMsg(TGnssGpsHis gpsHis){

GpsVehicleReceive r = new GpsVehicleReceive();

r.setX(1);

r.setY(2); 

topicPublisherService.convertAndSend(r);//发送

r = null;

}

d.activemq.xml配置

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans 

           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

           http://www.springframework.org/schema/context

           http://www.springframework.org/schema/context/spring-context-3.0.xsd  

           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd

           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  

   <property name="connectionFactory">  

      <bean class="org.apache.activemq.ActiveMQConnectionFactory">  

          <property name="brokerURL" value="tcp://127.0.0.1:9220" />  

      </bean>  

  </property>  

</bean>  

    <!-- 发送消息的目的地(主题) -->  

    <bean id="topicSubscriberMessageListenerDest" class="org.apache.activemq.command.ActiveMQTopic">  

        <constructor-arg index="0" value="myMessageListenerTopic" />  

    </bean>

    

    <bean id="msgConverter" class="cn.innosoft.jt809.activemq.converter.MsgConverter"></bean>

    <!-- 配置TopicJms模板  -->  

    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">  

        <property name="connectionFactory" ref="connectionFactory" />  

        <property name="defaultDestination" ref="topicSubscriberMessageListenerDest" />  

        <!-- 配置是否为发布订阅者模式,默认为false -->  

        <property name="pubSubDomain" value="true"/>  

<!-- 转换器 -->

        <property name="messageConverter" ref="msgConverter"></property>

    <!--<property name="receiveTimeout" value="10000" />  -->  

    </bean>  

    

    <bean id="topicPublisherService" class="cn.innosoft.jt809.activemq.service.TopicPublisherService">  

       <property name="jmsTemplate" ref="jmsTopicTemplate"/>  

        <property name="destination" ref="topicSubscriberMessageListenerDest" />

    </bean>

</beans>

 三:客户端使用

                客户端使用包含2部分:

                     a.监听类

                     b.activeMq.xml

 a.监听类

package cn.innosoft.exceptionAnalyse.activemq.service;

import java.text.SimpleDateFormat;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.TextMessage;

import cn.innosoft.exceptionAnalyse.biz.AnalyseMsgMng;

import cn.innosoft.exceptionAnalyse.biz.cache.model.GpsVehicleReceive;

/**

 * 经度异常接收数据监听类.

 * @author gaoq

 * @date 2015-3-27 下午4:32:30

 */

public class TopicColorExceptionService implements MessageListener{

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public void onMessage(Message message) {  

       if(message instanceof TextMessage){  

        TextMessage textMessage = (TextMessage) message; 

           try {  

            GpsVehicleReceive msg = bindProperties(textMessage);

               // AnalyseMsgMng.getInstance().colorQueue.put(msg);

           } catch (Exception e) {  

               e.printStackTrace();  

           }  

       }  

   }

/**

 * 绑定参数到对象上.

 * @param textMessage 消息.

 * @return GpsVehicleReceive

 */

private GpsVehicleReceive bindProperties(TextMessage textMessage) throws Exception{

GpsVehicleReceive msg = new GpsVehicleReceive();  

String[] texts=textMessage.getText().split(",");  

    msg.setX1(texts[0]);

              msg.setX2(texts[1]);

                    msg.setX3(texts[2]);

return msg;

}

 b.activeMq.xml

    <?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans 

           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

           http://www.springframework.org/schema/context

           http://www.springframework.org/schema/context/spring-context-3.0.xsd  

           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd

           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

<bean id="connectionFactory"  

        class="org.apache.activemq.ActiveMQConnectionFactory">  

        <property name="brokerURL" value="tcp://127.0.0.1:9220" />  

    </bean>  

      

    <!-- 发送消息的目的地(主题) -->  

     <bean id="topicSubscriberMessageListenerDest" class="org.apache.activemq.command.ActiveMQTopic">  

        <constructor-arg index="0" value="myMessageListenerTopic" />  

    </bean>  

    

<!--     <bean id="msgConverter" class="cn.innosoft.freightAnalyse.activemq.converter.MsgConverter"></bean> -->

    <!-- 配置TopicJms模板  -->  

    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">  

        <property name="connectionFactory" ref="connectionFactory" />  

        <property name="defaultDestination" ref="topicSubscriberMessageListenerDest" />  

        <!-- 配置是否为发布订阅者模式,默认为false -->  

        <property name="pubSubDomain" value="true"/>  

    </bean>  

    

    <bean id="topicColorException" class="cn.innosoft.exceptionAnalyse.activemq.service.TopicColorExceptionService"></bean> 

    <bean id="myMsgTopiclistenerContainerColor"  

        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  

        <property name="connectionFactory" ref="connectionFactory" />  

        <property name="destination" ref="topicSubscriberMessageListenerDest" />  

        <property name="messageListener" ref="topicColorException" />  

        <property name="pubSubDomain" value="true" />  

    </bean> 

</beans>   

=============================    action   end           ==========================

综上所述:一个完整的activeMQ的使用方式介绍完了。

接下来干啥呢。。。。。

想起来了。。。。activeMQ的服务还要启动,那么如何启动activeMQ的服务呢?

https://repository.apache.org/content/repositories/releases/org/apache/activemq/

下载了activeMQ之后

apache-activemq-5.9.0-bin.zip

                      解压后会发现:apache-activemq-5.9.0\bin目录下有win32,win64,那这个时候就要看服务器是什么系统了,我的是64位,所以就进去apache-activemq-5.9.0\bin\win64目录双击activemq.bat服务如果启动无异常就好了,然后在浏览器中访问:http://localhost:8161/admin,用户名:admin,密码:admin (这个应该是固定的,如果想改,也是可以得到自己去配置文件中找到位置改掉就ok了)

大功告成。。。。。。。。。。。。。。。

相关推荐