使用 ActiveMQ 示例
企业中各项目中相互协作的时候可能用得到消息通知机制。比如有东西更新了,可以通知做索引。
在Java里有JMS的多个实现。其中apache下的ActiveMQ就是不错的选择。还有一个比较热的是RabbitMQ(是erlang语言实现的)。这里示例下使用ActiveMQ
用ActiveMQ最好还是了解下JMS
JMS公共点对点域发布/订阅域
ConnectionFactoryQueueConnectionFactoryTopicConnectionFactory
ConnectionQueueConnectionTopicConnection
DestinationQueueTopic
SessionQueueSessionTopicSession
MessageProducerQueueSenderTopicPublisher
MessageConsumerQueueReceiverTopicSubscriber
JMS定义了两种方式:Quere(点对点);Topic(发布/订阅)。
ConnectionFactory是连接工厂,负责创建Connection。
Connection负责创建Session。
Session创建MessageProducer(用来发消息)和MessageConsumer(用来接收消息)。
Destination是消息的目的地。
详细的可以网上找些JMS规范(有中文版)。
下载apache-activemq-5.3.0。http://activemq.apache.org/download.html,解压,然后双击bin/activemq.bat。运行后,可以在http://localhost:8161/admin观察。也有demo,http://localhost:8161/demo。把activemq-all-5.3.0.jar加入classpath。
Jms发送代码:
publicstaticvoidmain(String[]args)throwsException{
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();
Connectionconnection=connectionFactory.createConnection();
connection.start();
Sessionsession=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Destinationdestination=session.createQueue("my-queue");
MessageProducerproducer=session.createProducer(destination);
for(inti=0;i<3;i++){
MapMessagemessage=session.createMapMessage();
message.setLong("count",newDate().getTime());
Thread.sleep(1000);
//通过消息生产者发出消息
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
Jms接收代码:
publicstaticvoidmain(String[]args)throwsException{
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();
Connectionconnection=connectionFactory.createConnection();
connection.start();
finalSessionsession=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Destinationdestination=session.createQueue("my-queue");
MessageConsumerconsumer=session.createConsumer(destination);
/*//listener方式
consumer.setMessageListener(newMessageListener(){
publicvoidonMessage(Messagemsg){
MapMessagemessage=(MapMessage)msg;
//TODOsomething....
System.out.println("收到消息:"+newDate(message.getLong("count")));
session.commit();
}
});
Thread.sleep(30000);
*/
inti=0;
while(i<3){
i++;
MapMessagemessage=(MapMessage)consumer.receive();
session.commit();
//TODOsomething....
System.out.println("收到消息:"+newDate(message.getLong("count")));
}
session.close();
connection.close();
}
启动JmsReceiver和JmsSender可以在看输出三条时间信息。当然Jms还指定有其它格式的数据,如TextMessage
结合Spring的JmsTemplate方便用:
xml:
<?xmlversion="1.0"encoding="UTF-8"?>
<beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<!--在非web/ejb容器中使用pool时,要手动stop,spring不会为你执行destroy-method的方法
<beanid="jmsFactory"class="org.apache.activemq.pool.PooledConnectionFactory"destroy-method="stop">
<propertyname="connectionFactory">
<beanclass="org.apache.activemq.ActiveMQConnectionFactory">
<propertyname="brokerURL"value="tcp://localhost:61616"/>
</bean>
</property>
</bean>
-->
<beanid="jmsFactory"class="org.apache.activemq.ActiveMQConnectionFactory">
<propertyname="brokerURL"value="tcp://localhost:61616"/>
</bean>
<beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
<propertyname="connectionFactory"ref="jmsFactory"/>
<propertyname="defaultDestination"ref="destination"/>
<propertyname="messageConverter">
<beanclass="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
<beanid="destination"class="org.apache.activemq.command.ActiveMQQueue">
<constructor-argindex="0"value="my-queue"/>
</bean>
</beans>
sender:
publicstaticvoidmain(String[]args){
ApplicationContextctx=newFileSystemXmlApplicationContext("classpath:app*.xml");
JmsTemplatejmsTemplate=(JmsTemplate)ctx.getBean("jmsTemplate");
jmsTemplate.send(newMessageCreator(){
publicMessagecreateMessage(Sessionsession)throwsJMSException{
MapMessagemm=session.createMapMessage();
mm.setLong("count",newDate().getTime());
returnmm;
}
});
}
receiver:
publicstaticvoidmain(String[]args){
ApplicationContextctx=newFileSystemXmlApplicationContext("classpath:app*.xml");
JmsTemplatejmsTemplate=(JmsTemplate)ctx.getBean("jmsTemplate");
while(true){
Map<String,Object>mm=(Map<String,Object>)jmsTemplate.receiveAndConvert();
System.out.println("收到消息:"+newDate((Long)mm.get("count")));
}
}
注意:直接用Jms接口时接收了消息后要提交一下,否则下次启动接收者时还可以收到旧数据。有了JmsTemplate就不用自己提交session.commit()了。如果使用了PooledConnectionFactory要把apache-activemq-5.3.0\lib\optional\activemq-pool-5.3.0.jar加到classpath