使用 ActiveMQ 示例

企业中各项目中相互协作的时候可能用得到消息通知机制。比如有东西更新了,可以通知做索引。

在Java里有JMS的多个实现。其中apache下的ActiveMQ就是不错的选择。还有一个比较热的是RabbitMQ(是erlang语言实现的)。这里示例下使用ActiveMQ

用ActiveMQ最好还是了解下JMS

JMS公共点对点域发布/订阅域

ConnectionFactoryQueueConnectionFactoryTopicConnectionFactory

ConnectionQueueConnectionTopicConnection

DestinationQueueTopic

SessionQueueSessionTopicSession

MessageProducerQueueSenderTopicPublisher

MessageConsumerQueueReceiverTopicSubscriber

JMS定义了两种方式:Quere(点对点);Topic(发布/订阅)。

ConnectionFactory是连接工厂,负责创建Connection。

Connection负责创建Session。

Session创建MessageProducer(用来发消息)和MessageConsumer(用来接收消息)。

Destination是消息的目的地。

详细的可以网上找些JMS规范(有中文版)。

下载apache-activemq-5.3.0。http://activemq.apache.org/download.html,解压,然后双击bin/activemq.bat。运行后,可以在http://localhost:8161/admin观察。也有demo,http://localhost:8161/demo。把activemq-all-5.3.0.jar加入classpath。

Jms发送代码:

publicstaticvoidmain(String[]args)throwsException{

ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();

Connectionconnection=connectionFactory.createConnection();

connection.start();

Sessionsession=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

Destinationdestination=session.createQueue("my-queue");

MessageProducerproducer=session.createProducer(destination);

for(inti=0;i<3;i++){

MapMessagemessage=session.createMapMessage();

message.setLong("count",newDate().getTime());

Thread.sleep(1000);

//通过消息生产者发出消息

producer.send(message);

}

session.commit();

session.close();

connection.close();

}

Jms接收代码:

publicstaticvoidmain(String[]args)throwsException{

ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();

Connectionconnection=connectionFactory.createConnection();

connection.start();

finalSessionsession=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

Destinationdestination=session.createQueue("my-queue");

MessageConsumerconsumer=session.createConsumer(destination);

/*//listener方式

consumer.setMessageListener(newMessageListener(){

publicvoidonMessage(Messagemsg){

MapMessagemessage=(MapMessage)msg;

//TODOsomething....

System.out.println("收到消息:"+newDate(message.getLong("count")));

session.commit();

}

});

Thread.sleep(30000);

*/

inti=0;

while(i<3){

i++;

MapMessagemessage=(MapMessage)consumer.receive();

session.commit();

//TODOsomething....

System.out.println("收到消息:"+newDate(message.getLong("count")));

}

session.close();

connection.close();

}

启动JmsReceiver和JmsSender可以在看输出三条时间信息。当然Jms还指定有其它格式的数据,如TextMessage

结合Spring的JmsTemplate方便用:

xml:

<?xmlversion="1.0"encoding="UTF-8"?>

<beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-2.5.xsd">

<!--在非web/ejb容器中使用pool时,要手动stop,spring不会为你执行destroy-method的方法

<beanid="jmsFactory"class="org.apache.activemq.pool.PooledConnectionFactory"destroy-method="stop">

<propertyname="connectionFactory">

<beanclass="org.apache.activemq.ActiveMQConnectionFactory">

<propertyname="brokerURL"value="tcp://localhost:61616"/>

</bean>

</property>

</bean>

-->

<beanid="jmsFactory"class="org.apache.activemq.ActiveMQConnectionFactory">

<propertyname="brokerURL"value="tcp://localhost:61616"/>

</bean>

<beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">

<propertyname="connectionFactory"ref="jmsFactory"/>

<propertyname="defaultDestination"ref="destination"/>

<propertyname="messageConverter">

<beanclass="org.springframework.jms.support.converter.SimpleMessageConverter"/>

</property>

</bean>

<beanid="destination"class="org.apache.activemq.command.ActiveMQQueue">

<constructor-argindex="0"value="my-queue"/>

</bean>

</beans>

sender:

publicstaticvoidmain(String[]args){

ApplicationContextctx=newFileSystemXmlApplicationContext("classpath:app*.xml");

JmsTemplatejmsTemplate=(JmsTemplate)ctx.getBean("jmsTemplate");

jmsTemplate.send(newMessageCreator(){

publicMessagecreateMessage(Sessionsession)throwsJMSException{

MapMessagemm=session.createMapMessage();

mm.setLong("count",newDate().getTime());

returnmm;

}

});

}

receiver:

publicstaticvoidmain(String[]args){

ApplicationContextctx=newFileSystemXmlApplicationContext("classpath:app*.xml");

JmsTemplatejmsTemplate=(JmsTemplate)ctx.getBean("jmsTemplate");

while(true){

Map<String,Object>mm=(Map<String,Object>)jmsTemplate.receiveAndConvert();

System.out.println("收到消息:"+newDate((Long)mm.get("count")));

}

}

注意:直接用Jms接口时接收了消息后要提交一下,否则下次启动接收者时还可以收到旧数据。有了JmsTemplate就不用自己提交session.commit()了。如果使用了PooledConnectionFactory要把apache-activemq-5.3.0\lib\optional\activemq-pool-5.3.0.jar加到classpath

相关推荐