ActiveMQ

消息机制

测试代码如下:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:spring/applicationContext.xml"})
public class UserMqTest {
    @Autowired
    MessageService messageService;
   
//    @Ignore
    @Test
    public void sendMessage(){
        UserMq user = new UserMq();
        user.setUid("a");
        user.setAddress("魔都");
        messageService.sendMessage(user);
        System.out.println("消息已发送");
       
//        UserMq user1 = new UserMq();
//        user1 = messageService.receiveMessage();
//       
//        System.out.println("已获取消息:"+user1.getUid()+user1.getAddress());
//        assertEquals(user1,null);
    }
   
    @Ignore
    @Test
    public void veceiveMessage(){
        UserMq user = new UserMq();
        user=messageService.receiveMessage();
        if (null==user){
            System.out.println("尼玛,是个空的");
        }else{
            System.out.println("已获取消息:"+user.getUid()+user.getAddress());
        }
    }
}

jms代码:


@Service
public class MessageServiceImpl implements MessageService {
    
    @Autowired
    Destination destination;
    
    @Autowired
    JmsTemplate jmsTemplate;
    
    /**
     * 发送消息
     */
    public void sendMessage(final UserMq user){
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                ObjectMessage msg = session.createObjectMessage(user);
//                TextMessage msg = session.createTextMessage("hello world");
                //msg.setStringProperty(name, value)
                //设置域属性,用于messageSelector过滤,暂时不用
                msg.setStringProperty("aaaa", "bbbb");
                return msg;
            }
        });
    }
    
    /**
     * 接受消息
     */
    public UserMq receiveMessage(){
        ObjectMessage receivedMessage = (ObjectMessage) jmsTemplate.receive(destination);
        try {
            return (UserMq) receivedMessage.getObject();
        } catch (JMSException e) {
            throw JmsUtils.convertJmsAccessException(e);
        }
    }
      
}

spirng 配置(独立的xml)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:jms="http://www.springframework.org/schema/jms"
 xmlns:amq="http://activemq.apache.org/schema/core"
 xsi:schemaLocation="http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

<!--   <amq:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616"/> -->
  <amq:connectionFactory id="connectionFactory" brokerURL="${jms.tcp}"/>  
  <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<!--         <constructor-arg index="0" value="ecs.alert.queue"></constructor-arg> -->
    <constructor-arg index="0" value="${jms.destination}"></constructor-arg>
  </bean>

<!--   <bean id="messageHandler" class="com.rockey.emonitor.jms.controller.MessageHandler" /> -->

  <bean name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <!-- 接收超时时间:1s -->
        <property name="receiveTimeout" value="${jms.receiveTimeout}"/>
        <!-- 发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
        <property name="deliveryMode" value="${jms.deliveryMode}"/>
        <!-- 类型转换 (可以做一些前置和后置处理过滤信息等)-->
        <!-- <property name="messageConverter" ref="jmsConverter"/> -->
  </bean>
 
<!--     <bean name="jmsConverter" class="com.rockey.emonitor.jms.util.JmsTrxConverter" /> -->
   
<!--     <bean name="messageService" class="com.rockey.emonitor.jms.service.impl.MessageServiceImpl" /> -->
 
<!--       <bean name="messageReceiver" class="com.rockey.emonitor.jms.service.impl.MessageReceiverImpl" /> -->
 
 
<!--  监听处理
    <jms:listener-container connection-factory="connectionFactory">
    <jms:listener destination="emonitor.alert.queue" ref="messageHandler" method="processMessage" />
  </jms:listener-container>   -->

</beans>
参数配置文件:

#服务端地址
jms.tcp=tcp://localhost:61616
#目标名称
jms.destination=ecs.alert.queue
#超时时间(毫秒)
jms.receiveTimeout=1000
#消息的发送模式,1、非持久化。2、持久化
jms.deliveryMode=2

pom文件:

<!-- ActiveMQ start -->
          <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq</artifactId>
            <version>5.9.0</version>
        </dependency>
       
        <!-- jmsTemplete的依赖包 start安装该包会自动下载 Spring-1.2.4jar,这个包就包含有jms-->
          <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring-common</artifactId>
            <version>2.8</version>
        </dependency>
          <!-- jmsTemplete的依赖包 end -->
       
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.7</version>
        </dependency>
          <!-- ActiveMQ end -->

调用方代码目前没写,以后补上

相关推荐