ActiveMQ编程实例

    本文主要展示如何使用activeMQ进行程序设计,可以作为代码实例参考;此后会继续补充有关JMS 或ActiveMQ的优化和架构部分。

    本实例主要展示如何使用Queue。

一.pom.xml

<dependencies>
    	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-context</artifactId>
		<version>3.2.3.RELEASE</version>
	</dependency>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-jms</artifactId>
		<version>3.2.3.RELEASE</version>
	</dependency>
	<dependency>
		<groupId>commons-lang</groupId>
		<artifactId>commons-lang</artifactId>
		<version>2.4</version>
	</dependency>
	<dependency>
		<groupId>javax.jms</groupId>
		<artifactId>jms</artifactId>
		<version>1.1</version>
	</dependency>
	<dependency>
		<groupId>org.apache.qpid</groupId>
		<artifactId>proton-jms</artifactId>
		<version>0.3</version>
	</dependency>
           
	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-all</artifactId>
		<version>5.8.0</version>
	</dependency>
</dependencies>
<build>
	<finalName>test-jms-1.0</finalName>
	<resources>
	    <resource>
		<directory>src/main/resources</directory>
		<filtering>true</filtering>
	    </resource>
	</resources>
</build>
    其中“proton-jms”是需要声明的,否则可能无法正常运行。

二.Java实例(非spring环境)

    对JMS的程序部分,推荐使用JNDI + 异步listener方式;所以接下来的例子将采取此方式。

    1) jndi.properties----[/src/main/resources/jndi.properties]

###contextFactory
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
###brokerUrl,any protocol
java.naming.provider.url = tcp://localhost:61616
##username
##java.naming.security.principal=
##password
##java.naming.security.credentials=
##connectionFactory,for building sessions
connectionFactoryNames = QueueCF,TopicCF
##topic.<topicName> = <physicalName-of-topic>
##your application should use <topicName>,such as:
## context.lookup("topic1");
##It can be more than once
topic.topic1 = jms.topic1
##queue.<topicName> = <physicalName-of-queue>
queue.queue1 = jms.queue1
   
    2) QueueMessageListener.java(消息异步监听器)
public class QueueMessageListener implements MessageListener{

	public void onMessage(Message message) {
		if(message == null){
			return;
		}
		try{
			if(message instanceof TextMessage){
				String text = ((TextMessage)message).getText();
				System.out.println("-----JMS Message header-----");
				//message的关联id,可以在发送消息时指定,用于描述消息的关联性
				System.out.println("CorrelationID :" + message.getJMSCorrelationID());
				//消息的“传送模式”,1:非持久,2:持久
				System.out.println("DeliveryMode :" + message.getJMSDeliveryMode());
				//消息的过期时间,毫秒数;如果在发送消息时指定了timeToLive,此值为timestap + timeToLive
				System.out.println("Expiration :" + message.getJMSExpiration());
				//消息ID,全局唯一
				System.out.println("MessageID :" + message.getJMSMessageID());
				//消息权重,参考属性
				System.out.println("Priority :" + message.getJMSPriority());
				//是否为“重发”;当一个消息发送给消费者之后,未能收到“确认”,将会导致消息重发
				System.out.println("Redelivered :" +message.getJMSRedelivered());
				//消息创建的时间戳,当消息发送时被赋值。
				System.out.println("Timestamp :" + message.getJMSTimestamp());
				//消息的类型
				System.out.println("Type :" + message.getJMSType());
				System.out.println("-----Message Properties-----");
				Enumeration<String> names = message.getPropertyNames();
				if(names != null){
					while(names.hasMoreElements()){
						String key = names.nextElement();
						System.out.println(key + ":" + message.getStringProperty(key));
					}
				}
				System.out.println(">>>>" + text);
			}
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}
  
    3) SimpleConsumer.java(消息消费者)

     因为我们已经使用了“MessageListener”来异步接受消息,事实上JMS的实现中已经开启了单独的线程用来从网络中接受消息,并逐次调用onMessage方法;此处我们没有必要再次额外的创建线程。

public class SimpleConsumer {

	private Connection connection;
	private Session session;
	private MessageConsumer consumer;
	
	private boolean isStarted;
	
	public SimpleConsumer(MessageListener listener) throws Exception{
		Context context = new InitialContext();
		ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
		connection = connectionFactory.createConnection();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination queue = (Queue)context.lookup("queue1");
		consumer = session.createConsumer(queue);
		consumer.setMessageListener(listener);
	}
	
	
	public synchronized boolean start(){
		if(isStarted){
			return true;
		}
		try{
			connection.start();//very important
			isStarted = true;
			return true;
		}catch(Exception e){
			return false;
		}
	}
	
	public synchronized void close(){
		isStarted = false;
		try{
			session.close();
			connection.close();
		}catch(Exception e){
			//
		}
	}
}
  
    4) SimpleProductor.java(消息生产者)
public class SimpleProductor {

	private MessageProducer producer;
	private Session session;
	private Connection connection;
	private boolean isOpen = true;
	
	public SimpleProductor() throws Exception{
		Context context = new InitialContext();
		ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
		connection = connectionFactory.createConnection();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination queue = (Queue)context.lookup("queue1");
		producer = session.createProducer(queue);
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		
	}
	
	
	public boolean send(String message) {
		if(!isOpen){
			throw new RuntimeException("session has been closed!");
		}
		try{
			producer.send(session.createTextMessage(message));
			return true;
		}catch(Exception e){
			return false;
		}
	}
	
	public synchronized void close(){
		try{
			if(isOpen){
				isOpen = false;
			}
			session.close();
			connection.close();
		}catch (Exception e) {
			//
		}
	}
	
}

    上面的程序,基本上可以完成简单的消息发送和接受,此外,还有一种不常用的方式:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(QUEUE);
MessageProducer producer = session.createProducer(queue);
//MessageConsumer consumer = session.createConsumer(queue)
//同步接收消息
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
while(true){
	Message message = consumer.receive(10000);
	if(message == null){
		continue;
	}
	if(message instanceof TextMessage){
		//...
	}
}

    5) 测试方法:

SimpleProductor productor = new SimpleProductor();
productor.start();
for(int i=0; i<10; i++){
	productor.send("message content:" + i);
}
productor.close();
SimpleConsumer consumer = new SimpleConsumer(new QueueMessageListener());
consumer.start();

//consumer.close();

三.Spring-jms实例

    1) 配置文件:

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
	<property name="brokerURL" value="tcp://localhost:61616"></property>
</bean>
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
	<constructor-arg index="0" value="queue1"></constructor-arg>
</bean>
<!-- productor -->
<bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
	<property name="connectionFactory" ref="connectionFactory"></property>
	<property name="defaultDestination" ref="queueDestination"></property>
	<!-- 是否在message中开启timestamp属性 -->
	<property name="messageTimestampEnabled" value="true"></property>
	<!-- 是否开启deliveryMode,priority,timeToLive消息附属属性 ,否则上述3个属性将采用默认值-->
	<property name="explicitQosEnabled" value="true"></property>
	<!-- NON_PERSISTENT = 1,PERSISTENT = 2,默认值为2-->
	<property name="deliveryMode" value="2"></property>
	<!-- pubSubNoLocal,对于topic而言,还需要注意此选项:是否接受本地消息,当消费者和生产者公用一个connection时 -->
</bean>
<bean id="productor" class="com.test.jms.spring.impl.ProductorImpl">
	<property name="jmsTemplate" ref="queueTemplate"></property>
</bean>
<!-- MDB -->
<bean id="queueMessageListener" class="com.test.jms.object.QueueMessageListener"/>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"></property>
	<property name="destination" ref="queueDestination"></property>
	<property name="messageListener" ref="queueMessageListener"></property>
	<!-- 如果消息的接收速率,大于消息处理的速率时,可以采取线程池方式 
	<property name="taskExecutor" ref="queueMessageExecutor"></property>
	-->
	 <!--  -->
	<property name="concurrentConsumers" value="1"></property>
	<!-- [concurrentConsumers]-[maxConcurrentConsumers] -->
	<!--  
	<property name="concurrency" value="1-5"></property>
	-->
	
</bean>
<bean id="queueMessageExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
	<property name="corePoolSize" value="2" />
	<property name="maxPoolSize" value="5" />
	<property name="daemon" value="true" />
	<property name="keepAliveSeconds" value="120" />
</bean>

    我们采用了spring推荐的方式:消息生产者基于jmsTemplate,消息消费者基于MDB(pojo的消息驱动bean);为了提升消息的接收者的吞吐能力,我们可以采取多线程的方式,其中有几个重要的参考配置:

    A) taskExecutor:消息接受端使用线程池的方式处理消息;当消息的网络输入速率大于消息的处理速率时,可以考虑采用此方式;在消息消费者的与JMS-Server的网络链接中,每收到一条消息,将会立即交付给线程池中的线程去执行,执行时仍然调用messageListener的方法;此处需要注意,线程池中的所有线程仍然共享一个messageListener实例,在采用线程池模式中,请注意线程安全问题。

    B) concurrentConsumers:并发运行的消费者个数,在默认情况下为一个“消息消费者”;事实上,一个consumer即为一个Session,多个consumer即为多个Session;不过它们底层仍然共享一个“tcp链接”;此配置项仍然是适用于“消息的网络输入速率大于消息的处理速率”的场景;每一个consumer都将会在单独的线程中运行,但是它们仍然共享一个messageListener实例;在此场景下,你无法绝对的保证,原本“有序”的消息在交付给多个consumer时被实际执行的顺序也是严格的。

    taskExecutor是一种额外的优化策略,concurrentConsumers则是采用了JMS原生的特性;在实际场景中,我们选择一种即可。如下为Spring-JMS是如何使用线程池处理消息的原理(基于封装的思想):

if (this.taskExecutor != null) {
	consumer.setMessageListener(new MessageListener() {
		public void onMessage(final Message message) {
			taskExecutor.execute(new Runnable() {
				public void run() {
					processMessage(message, session);
				}
			});
		}
	});
}
else {
	consumer.setMessageListener(new MessageListener() {
		public void onMessage(Message message) {
			processMessage(message, session);
		}
	});
}

    2) ProductorImpl.java

public class ProductorImpl implements Productor {

	private JmsTemplate jmsTemplate;
	
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	public void send(final String message) {
		if(message == null){
			return;
		}
		jmsTemplate.send(new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		});
	}

	public void send(final Map<String, String> message) {
		if(message == null || message.isEmpty()){
			return;
		}
		jmsTemplate.send(new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				MapMessage mm = session.createMapMessage();
				for(Entry<String, String> entry : message.entrySet()){
					mm.setString(entry.getKey(), entry.getValue());
				}
				return mm;
			}
		});
		
	}

	
}

    非常的简单,我们不用书写和consumer有关的代码。一切就结束了。[备注,本文中的实例命名为Productor,只是为了避免和JMS中producer混淆]

相关推荐