jms-activeMQ

两篇文章和自己的测试体验及结合spring的配置

1.JMS介绍

JMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作伙伴设计的JMSAPI定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。JMS有四个组成部分:JMS服务提供者、消息管理对象、消息的生产者消费者和消息本身。

1)JMS服务提供者实现消息队列和通知,同时实现消息管理的API。JMS已经是J2EEAPI的一部分,J2EE服务器都提供JMS服务。

2)消息管理对象提供对消息进行操作的API。JMSAPI中有两个消息管理对象:创建jms连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同ConnectionFactory可以分为QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。

3)消息的生产者和消费者。消息的产生由JMS的客户端完成,JMS服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber)和点对点消息的接收者(queuereceiver)

4)消息。消息是服务提供者和客户端之间传递信息所使用的信息单元。JMS消息由以下三部分组成:

消息头(header)――JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。

属性(property)――用来添加删除消息头以外的附加信息。

消息体(body)――JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。

2.Messages通信方式

上面提到JMS通信方式分为点对点通信和发布/订阅方式

1)点对点方式(point-to-point)

点对点的消息发送方式主要建立在MessageQueue,Sender,reciever上,MessageQueue存贮消息,Sneder发送消息,receive接收消息.具体点就是SenderClient发送MessageQueue,而receiverCliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行

2)发布/订阅方式(publish/subscriberMessaging)

发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber,在接收消息时有两种方法,destination的receive方法,和实现messagelistener接口的onMessage方法。

3.为什么选用ActiveMQ

1)ActiveMQ是一个开放源码

2)基于Apache2.0licenced发布并实现了JMS1.1。

3)ActiveMQ现在已经和作为很多项目的异步消息通信核心了

4)在很多中小型项目中采用ActiveMQ+SPRING+TOMCAT开发模式。

4.编程模式

4.1消息产生者向JMS发送消息的步骤

(1)创建连接使用的工厂类JMSConnectionFactory

(2)使用管理对象JMSConnectionFactory建立连接Connection

(3)使用连接Connection建立会话Session

(4)使用会话Session和管理对象Destination创建消息生产者MessageSender

(5)使用消息生产者MessageSender发送消息

4.2消息消费者从JMS接受消息的步骤

(1)创建连接使用的工厂类JMSConnectionFactory

(2)使用管理对象JMSConnectionFactory建立连接Connection

(3)使用连接Connection建立会话Session

(4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver

(5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver

消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。

5.ActiveMQ运行

a.去官方网站下载:http://activemq.apache.org/

b.解压缩apache-activemq-5.7.0-bin.zip,然后运行文件下\bin\activemq.bat。

c.创建project:ActiveMQ,并导入根目录下的activemq-all-5.7.0.jar

启动ActiveMQ以后,登陆:http://localhost:8161/admin/

ActiveMQ,提供一个demo应用和用于监控ActiveMQ的admin应用。

打开http://localhost:8161/admin/queues.jsp,可以查看相应的queue中是否有消息

6、简单实例代码

Sender.java

package com.test.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // MessageProducer:消息发送者
        MessageProducer producer;
        // TextMessage message;
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            // 创建queue
            destination = session.createQueue("FirstQueue");
            // 得到消息生成者【发送者】
            producer = session.createProducer(destination);
            // 设置不持久化,此处学习,实际根据项目决定
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 构造消息,此处写死,项目就是参数,或者方法获取
            sendMessage(session, producer);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session
                    .createTextMessage("ActiveMq 发送的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }
}

Receiver.java

package com.test.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {
    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // 消费者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);
            // 创建queue
            destination = session.createQueue("FirstQueue");
            consumer = session.createConsumer(destination);
            while (true) {
                //设置接收者接收消息的时间,为了便于测试,这里谁定为100s
                TextMessage message = (TextMessage) consumer.receive(100000);
                if (null != message) {
                    System.out.println("收到消息" + message.getText());
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}

7、与spring结合配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">

	<description>JMS简单应用配置</description>

	<!-- ActiveMQ 连接工厂 -->
	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616" />
	</bean>

	<!-- Spring Caching 连接工厂 -->
	<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory" ref="connectionFactory" />
		<property name="sessionCacheSize" value="10" />
	</bean>

	<!-- Queue定义 -->
	<bean id="notifyQueue" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg value="q.notify" />
	</bean>

	<!-- Topic定义 -->
	<bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="t.notify" />
	</bean>

	<!-- Spring JMS Template -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="cachingConnectionFactory" />
	</bean>

	<!-- 使用Spring JmsTemplate的消息生产者 -->
	<bean id="notifyMessageProducer" class="com.ambow.lmsx.jms.NotifyMessageProducer">
		<property name="jmsTemplate" ref="jmsTemplate" />
		<property name="notifyQueue" ref="notifyQueue" />
		<property name="notifyTopic" ref="notifyTopic" />
	</bean>

	<!-- 异步接收Queue消息Container -->
	<bean id="queueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destination" ref="notifyQueue" />
		<property name="messageListener" ref="notifyMessageListener" />
		<property name="concurrentConsumers" value="10" />
	</bean>

	<!-- 异步接收Topic消息Container -->
	<bean id="topicContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destination" ref="notifyTopic" />
		<property name="messageListener" ref="notifyMessageListener" />
	</bean>

	<!-- 异步接收消息处理类 -->
	<bean id="notifyMessageListener" class="com.ambow.lmsx.jms.NotifyMessageListener" />
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">

	<description>JMS高级应用配置(NON_PERSISTENT,DURIABLE,SELECTOR)</description>

	<!-- ActiveMQ 连接工厂 -->
	<bean id="advancedConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616" />
		<!-- 对PERSISTENT的消息进行异步发送(NON_PERSISTENT消息默认异步发送) -->
		<!-- <property name="useAsyncSend" value="true" /> -->
	</bean>

	<!-- 持久化主题订阅者ActiveMQ 连接工厂 -->
	<bean id="advancedTopicConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616" />
		<!-- Durable订阅者必须设置ClientId -->
		<property name="clientID" value="durableTopicListenerDemo" />
	</bean>

	<!-- Spring Caching 连接工厂 -->
	<bean id="advancedCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory" ref="advancedConnectionFactory" />
		<property name="sessionCacheSize" value="10" />
	</bean>

	<!-- Queue定义 -->
	<bean id="advancedNotifyQueue" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg value="q.advanced.notify" />
	</bean>

	<!-- Topic定义 -->
	<bean id="advancedNotifyTopic" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="t.advanced.notify" />
	</bean>

	<!-- Spring JMS Template -->
	<bean id="advancedJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="advancedCachingConnectionFactory" />
		<!-- 使 deliveryMode, priority, timeToLive设置生效-->
		<property name="explicitQosEnabled" value="true" />
		<!-- 设置NON_PERSISTENT模式, 默认为PERSISTENT -->
		<property name="deliveryPersistent" value="false" />
		<!-- 设置优先级, 默认为4 -->
		<property name="priority" value="9" />
	</bean>

	<!-- 使用Spring JmsTemplate的消息生产者 -->
	<bean id="advancedNotifyMessageProducer" class="com.ambow.lmsx.jms.AdvancedNotifyMessageProducer">
		<property name="jmsTemplate" ref="advancedJmsTemplate" />
		<property name="notifyQueue" ref="advancedNotifyQueue" />
		<property name="notifyTopic" ref="advancedNotifyTopic" />
	</bean>

	<!-- 异步接收Queue消息Container -->
	<bean id="advancedQueueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="advancedConnectionFactory" />
		<property name="destination" ref="advancedNotifyQueue" />
		<property name="messageListener" ref="advancedNotifyMessageListener" />
		<!-- 初始5个Consumer, 可动态扩展到10 -->
		<property name="concurrentConsumers" value="5" />
		<property name="maxConcurrentConsumers" value="10" />
		<!-- 设置消息确认模式为Client -->
		<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
	</bean>

	<!-- 异步接收Topic消息Container -->
	<bean id="advancedTopicContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="advancedTopicConnectionFactory" />
		<property name="destination" ref="advancedNotifyTopic" />
		<property name="messageListener" ref="advancedNotifyMessageListener" />
		<!-- JMS Selector消息过滤器 -->
		<property name="messageSelector" value="objectType='user'" />
		<!-- 持久化订阅者 -->
		<property name="subscriptionDurable" value="true" />
	</bean>

	<!-- 异步接收消息处理类 -->
	<bean id="advancedNotifyMessageListener" class="com.ambow.lmsx.jms.AdvancedNotifyMessageListener" />
</beans>

相关推荐