ActiveMQ
消息机制
测试代码如下:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:spring/applicationContext.xml"})
public class UserMqTest {
@Autowired
MessageService messageService;
// @Ignore
@Test
public void sendMessage(){
UserMq user = new UserMq();
user.setUid("a");
user.setAddress("魔都");
messageService.sendMessage(user);
System.out.println("消息已发送");
// UserMq user1 = new UserMq();
// user1 = messageService.receiveMessage();
//
// System.out.println("已获取消息:"+user1.getUid()+user1.getAddress());
// assertEquals(user1,null);
}
@Ignore
@Test
public void veceiveMessage(){
UserMq user = new UserMq();
user=messageService.receiveMessage();
if (null==user){
System.out.println("尼玛,是个空的");
}else{
System.out.println("已获取消息:"+user.getUid()+user.getAddress());
}
}
}
jms代码:
@Service
public class MessageServiceImpl implements MessageService {
@Autowired
Destination destination;
@Autowired
JmsTemplate jmsTemplate;
/**
* 发送消息
*/
public void sendMessage(final UserMq user){
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
ObjectMessage msg = session.createObjectMessage(user);
// TextMessage msg = session.createTextMessage("hello world");
//msg.setStringProperty(name, value)
//设置域属性,用于messageSelector过滤,暂时不用
msg.setStringProperty("aaaa", "bbbb");
return msg;
}
});
}
/**
* 接受消息
*/
public UserMq receiveMessage(){
ObjectMessage receivedMessage = (ObjectMessage) jmsTemplate.receive(destination);
try {
return (UserMq) receivedMessage.getObject();
} catch (JMSException e) {
throw JmsUtils.convertJmsAccessException(e);
}
}
}
spirng 配置(独立的xml):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<!-- <amq:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616"/> -->
<amq:connectionFactory id="connectionFactory" brokerURL="${jms.tcp}"/>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- <constructor-arg index="0" value="ecs.alert.queue"></constructor-arg> -->
<constructor-arg index="0" value="${jms.destination}"></constructor-arg>
</bean>
<!-- <bean id="messageHandler" class="com.rockey.emonitor.jms.controller.MessageHandler" /> -->
<bean name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<!-- 接收超时时间:1s -->
<property name="receiveTimeout" value="${jms.receiveTimeout}"/>
<!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
<property name="deliveryMode" value="${jms.deliveryMode}"/>
<!-- 类型转换 (可以做一些前置和后置处理过滤信息等)-->
<!-- <property name="messageConverter" ref="jmsConverter"/> -->
</bean>
<!-- <bean name="jmsConverter" class="com.rockey.emonitor.jms.util.JmsTrxConverter" /> -->
<!-- <bean name="messageService" class="com.rockey.emonitor.jms.service.impl.MessageServiceImpl" /> -->
<!-- <bean name="messageReceiver" class="com.rockey.emonitor.jms.service.impl.MessageReceiverImpl" /> -->
<!-- 监听处理
<jms:listener-container connection-factory="connectionFactory">
<jms:listener destination="emonitor.alert.queue" ref="messageHandler" method="processMessage" />
</jms:listener-container> -->
</beans>
参数配置文件:
#服务端地址
jms.tcp=tcp://localhost:61616
#目标名称
jms.destination=ecs.alert.queue
#超时时间(毫秒)
jms.receiveTimeout=1000
#消息的发送模式,1、非持久化。2、持久化
jms.deliveryMode=2
pom文件:
<!-- ActiveMQ start -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq</artifactId>
<version>5.9.0</version>
</dependency>
<!-- jmsTemplete的依赖包 start安装该包会自动下载 Spring-1.2.4jar,这个包就包含有jms-->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring-common</artifactId>
<version>2.8</version>
</dependency>
<!-- jmsTemplete的依赖包 end -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.7</version>
</dependency>
<!-- ActiveMQ end -->
调用方代码目前没写,以后补上