activemq topic 搭建

最近需要用到activemq的topic发布订阅功能

activemq可以到官网下载,下载完成后启动很简单,bin/activemq start

到时候可以访问管理控制台,密码是admin/admin,

http://192.168.91.128:8161/admin/topics.jsp,     128是我的部署activemq的ip地址,用的是redhat6.4

mq broker搭建好之后,我们首先需要开发publish程序,这里我们通过tomcat的jndi来完成,

新建一个web项目,然后META-INF里面创建context.xml,内容如下,

<Context antiJARLocking="true">
    <Resource
        name="jms/ConnectionFactory"
        auth="Container"
        type="org.apache.activemq.ActiveMQConnectionFactory"
        description="JMS Connection Factory"
        factory="org.apache.activemq.jndi.JNDIReferenceFactory"
        brokerURL="tcp://192.168.91.128:61616"
        brokerName="LocalActiveMQBroker"
        useEmbeddedBroker="false"/>
 
    <Resource name="jms/topic/MyTopic"
        auth="Container"
        type="org.apache.activemq.command.ActiveMQTopic"
        factory="org.apache.activemq.jndi.JNDIReferenceFactory"
        physicalName="MY.TEST.FOO"/>
</Context>

 完成之后写个servlet来获取jnid中配置的factory和topic信息,代码如下

InitialContext initCtx = new InitialContext();
		    Context envContext = (Context) initCtx.lookup("java:comp/env");
		    ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/ConnectionFactory");
		    Connection connection = connectionFactory.createConnection();
		    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		    MessageProducer producer = session.createProducer((Destination) envContext.lookup("jms/topic/MyTopic"));
		 
		    Message testMessage = session.createMessage();
		    testMessage.setStringProperty("testKey", "testValue111");
		    producer.send(testMessage);

deploy应用之前,需要将activemq-all-5.9.0.jar放入到tomcat/lib下面,启动tomcat,接着部署刚才的 servlet应用,启动tomcat后topic不会被自动创建,然后可以访问这个servlet,会发现topic会被创建,可以观察控制台

接着是消费者subscriber,

下面是普通的main方法来消费topic

package com.activemqtest;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Reciever {

	protected Topic queue;

	protected String queueName = "MY.TEST.FOO";

	protected String url = "tcp://192.168.91.128:61616";

	protected int ackMode = Session.AUTO_ACKNOWLEDGE;

	public static void main(String[] args) {
		Reciever rec = new Reciever();
		try {
			rec.run();
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	public void run() throws JMSException {

		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				url);
		TopicConnection connection = (TopicConnection) connectionFactory
				.createTopicConnection();

		connection.start();
		MessageConsumer consumer = null;
		Session session = connection.createTopicSession(false,
				Session.AUTO_ACKNOWLEDGE);
		queue = session.createTopic(queueName);
		consumer = session.createConsumer(queue);

		System.out.println(" Waiting for message (max 5) ");

		for (int i = 0; i < 3; i++) {
			Message message = consumer.receive();
			processMessage(message);

		}

		System.out.println("Closing connection");
		consumer.close();
		session.close();
		connection.close();

	}

	public void processMessage(Message message) {

		try {

			 if (message instanceof TextMessage) {
                 TextMessage textMessage = (TextMessage) message;
                 String text = textMessage.getText();
                 System.out.println("Received: " + text);
             } else {
                 System.out.println(message.getStringProperty("testKey"));
             }

		} catch (Exception e) {

			e.printStackTrace();

		}
	}
}

总结:

Topics

In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.

可以发现topic的消费者需要active,你不能等到publish完成之后,你启动subscriber,这样是接受不到消息的,但是queue的话不一样,不一定active才可以接受消息

另外有个问题是,tomcat只是一个servlet容器,并没有实现很多的j2ee标准比如jms,所以里面是没有javax.jms下面这些package的,需要单独下载J2ee.jar

相关推荐