浅谈用activeMQ来实现JMS的异步传输

1.JMS介绍

JMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作伙伴设计的JMSAPI定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。JMS有四个组成部分:JMS服务提供者、消息管理对象、消息的生产者消费者和消息本身。

1)JMS服务提供者实现消息队列和通知,同时实现消息管理的API。JMS已经是J2EEAPI的一部分,J2EE服务器都提供JMS服务。

2)消息管理对象提供对消息进行操作的API。JMSAPI中有两个消息管理对象:创建jms连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同ConnectionFactory可以分为QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。

3)消息的生产者和消费者。消息的产生由JMS的客户端完成,JMS服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber)和点对点消息的接收者(queuereceiver)

4)消息。消息是服务提供者和客户端之间传递信息所使用的信息单元。JMS消息由以下三部分组成:

消息头(header)――JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。

属性(property)――用来添加删除消息头以外的附加信息。

消息体(body)――JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。

2.Messages通信方式

上面提到JMS通信方式分为点对点通信和发布/订阅方式

1)点对点方式(point-to-point)

点对点的消息发送方式主要建立在MessageQueue,Sender,reciever上,MessageQueue存贮消息,Sneder发送消息,receive接收消息.具体点就是SenderClient发送MessageQueue,而receiverCliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行

2)发布/订阅方式(publish/subscriberMessaging)

发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber,在接收消息时有两种方法,destination的receive方法,和实现messagelistener接口的onMessage方法。

3.为什么选用ActiveMQ

1)ActiveMQ是一个开放源码

2)基于Apache2.0licenced发布并实现了JMS1.1。

3)ActiveMQ现在已经和作为很多项目的异步消息通信核心了

4)在很多中小型项目中采用ActiveMQ+SPRING+TOMCAT开发模式。

4.编程模式

4.1消息产生者向JMS发送消息的步骤

(1)创建连接使用的工厂类JMSConnectionFactory

(2)使用管理对象JMSConnectionFactory建立连接Connection

(3)使用连接Connection建立会话Session

(4)使用会话Session和管理对象Destination创建消息生产者MessageSender

(5)使用消息生产者MessageSender发送消息

4.2消息消费者从JMS接受消息的步骤

(1)创建连接使用的工厂类JMSConnectionFactory

(2)使用管理对象JMSConnectionFactory建立连接Connection

(3)使用连接Connection建立会话Session

(4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver

(5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver

消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。

5.ActiveMQ运行

ActiveMQ5.0版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。运行%activemq_home%bin/目录下的activemq.bat,之后你会看见如下一段话表示启动成功。

打开http://localhost:8161/admin/queues.jsp ,可以查看相应的queue中是否有消息

6.SendMessage(用于发送消息)

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class SendMessage {
 private static final String url ="tcp://localhost:61616";
 private static final String QUEUE_NAME ="choice.queue";
 protected String expectedBody = "<hello>world!</hello>";
 public void sendMessage() throws JMSException{
  Connection connection =null;
  try{
   ActiveMQConnectionFactory connectionFactory =new ActiveMQConnectionFactory(url);
   connection = (Connection)connectionFactory.createConnection();
   connection.start();
   Session session = (Session)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   Destination destination = session.createQueue(QUEUE_NAME);
   MessageProducer producer = session.createProducer(destination);
   TextMessage message = session.createTextMessage(expectedBody);
   message.setStringProperty("headname", "remoteB");
   producer.send(message);    
  }catch(Exception e){
   e.printStackTrace();
  }
 }
 
 public static void main(String[] args){
  SendMessage sndMsg = new SendMessage();
  try{
   sndMsg.sendMessage();
  }catch(Exception ex){
   System.out.println(ex.toString());
  }
 }
}

7.ReceiveMessage(用于接收消息)

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ReceiveMessage {
 private static final String url = "tcp://localhost:61616";
 private static final String QUEUE_NAME = "choice.queue";
 public void receiveMessage() {
  Connection connection = null;
  try {
   try {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
    connection = connectionFactory.createConnection();
   } catch (Exception e) {
                System.out.println(e.toString());
   }
   connection.start();
   Session session = connection.createSession(false,
     Session.AUTO_ACKNOWLEDGE);
   Destination destination = session.createQueue(QUEUE_NAME);
   MessageConsumer consumer = session.createConsumer(destination);
   consumeMessagesAndClose(connection, session, consumer);
  } catch (Exception e) {
    System.out.println(e.toString());
  }
 }

 protected void consumeMessagesAndClose(Connection connection,Session session, MessageConsumer consumer)
 throws JMSException {
  for (int i = 0; i < 1;) {
   Message message = consumer.receive(1000);
   if (message != null) {
    i++;
    onMessage(message);
   }
  }
  System.out.println("Closing connection");
  consumer.close();
  session.close();
  connection.close();
 }

 public void onMessage(Message message) {
  try {
   if (message instanceof TextMessage) {
    TextMessage txtMsg = (TextMessage) message;
    String msg = txtMsg.getText();
    System.out.println("Received: " + msg);
   }
  } catch (Exception e) {
   e.printStackTrace();
  }

 }

 public static void main(String args[]) {
  ReceiveMessage rm = new ReceiveMessage();
  rm.receiveMessage();
 }

}

相关推荐