JMS-activeMQ

使用场景:

1.通过JMSfacade更新数据库,完全解耦了客户端和服务器端处理过程。

2.与遗留老式系统接口

3.应付大访问量交易系统

4.协同多系统之间处理效率

5、数据同步等

ActiveMQ是Apache出品,是目前最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMSProvider实现。

特性及优势

1 、实现 JMS1.1 规范,支持 J2EE1.4 以上
2 、可运行于任何 jvm 和大部分 web 容器( ActiveMQ works great in any JVM )
3 、支持多种语言客户端( java, C, C++, AJAX, ACTIONSCRIPT 等等)
4 、支持多种协议( stomp , openwire , REST )
5 、良好的 spring 支持( ActiveMQ has great Spring Support )
6 、速度很快, JBossMQ 的十倍( ActiveMQ is very fast; often 10x faster than JBossMQ. )
7 、与 OpenJMS 、 JbossMQ 等开源 jms provider 相比, ActiveMQ 有 Apache 的支持,持续发展的优势明显。

2、ActiveMQ中的两种消息处理模式

Queue 与 Topic 的比较
1 、 JMS Queue 执行 load balancer 语义:
一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。
2 、 Topic 实现 publish 和 subscribe 语义:
一条消息被 publish 时,它将发到所有感兴趣的订阅者,所以零到多个 subscriber 将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的 subscriber 能够获得消息的一个拷贝。
3 、分别对应两种消息模式:
Point-to-Point ( 点对点 ),Publisher/Subscriber Model ( 发布 / 订阅者 )
其中在 Publicher/Subscriber 模式下又有 Nondurable subscription (非持久订阅)和 durable subscription ( 持久化订阅 )2 种消息处理方式。

接下来开始MQ的实例

1.下载ActiveMQ

去官方网站下载:http://activemq.apache.org/

2.运行ActiveMQ

解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。

3、配置conf\activemq.xmlweb管理界面启动IP和端口

  
<transportConnectors>
        <transportConnector name="openwire" uri="tcp://localhost:61616"/>
     </transportConnectors>

4、配置conf\jetty.xml通讯端口

 
<property name="connectors">
      <list>
        <bean id="Connector"  
               class="org.eclipse.jetty.server.nio.SelectChannelConnector">
            <property name="port" value="8161" />
         </bean>
       </list>
     </property>
     注:一般保持默认 不修改8161端口

5、在eclipse中新建项目,将apache-activemq-5.5.1\lib中所需要的jar包复制到项目中.

5.1ClassSender

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
	
	public static void main(String[] args) throws JMSException{
		ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
		        ActiveMQConnection.DEFAULT_USER,  
		        ActiveMQConnection.DEFAULT_PASSWORD,  
		        "tcp://localhost:61616");  
		  
		//连接到JMS提供者  
		Connection conn = connFactory.createConnection();  
		conn.start();  
		  
		//事务性会话,自动确认消息  
		Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
		  
		//消息的目的地  
		Destination destination = session.createQueue("queue.hello");  
		  
		//消息生产者  
		MessageProducer producer = session.createProducer(destination);  
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //不持久化
		
		//发送消息
		sendText(session, producer);
		
		session.commit(); //在事务性会话中,只有commit之后,消息才会真正到达目的地  
		producer.close();  
		session.close();  
		conn.close();  
	}
	
	
	//对象消息  
	public static void sendObject(Session session,MessageProducer producer) throws JMSException{
		User user = new User("cjm", "对象消息"); //User对象必须实现Serializable接口  
		ObjectMessage objectMessage = session.createObjectMessage();  
		objectMessage.setObject(user);  
		producer.send(objectMessage);  
	}
	
	//字节消息  
	public static void sendBytes(Session session,MessageProducer producer) throws JMSException{
		String s = "BytesMessage字节消息";  
		BytesMessage bytesMessage = session.createBytesMessage();  
		bytesMessage.writeBytes(s.getBytes());  
		producer.send(bytesMessage);  
	}
	 
	//流消息  
	public static void setStream(Session session,MessageProducer producer)throws JMSException{
		StreamMessage streamMessage = session.createStreamMessage();  
		streamMessage.writeString("streamMessage流消息");  
		streamMessage.writeLong(55);  
		producer.send(streamMessage);  
	}
	
	//键值对消息  
	public static void sendMap(Session session,MessageProducer producer)throws JMSException{
		MapMessage mapMessage = session.createMapMessage();  
		mapMessage.setLong("age", new Long(32));  
		mapMessage.setDouble("sarray", new Double(5867.15));  
		mapMessage.setString("username", "键值对消息");  
		producer.send(mapMessage);  
	}
	
	//文本消息  
	public static void sendText(Session session,MessageProducer producer)throws JMSException{  
		TextMessage textMessage = session.createTextMessage("文本消息");  
		producer.send(textMessage);  
	}
}

5.2ClassReceiver

 
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

    public class Receiver implements MessageListener{  
        private boolean stop = false;  
          
        public void execute() throws Exception {  
            //连接工厂  
            ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
                    ActiveMQConnection.DEFAULT_USER,  
                    ActiveMQConnection.DEFAULT_PASSWORD,  
                    "tcp://localhost:61616");  
              
            //连接到JMS提供者  
            Connection conn = connFactory.createConnection();  
            conn.start();  
              
            //事务性会话,自动确认消息  
            Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
              
            //消息的来源地  
            Destination destination = session.createQueue("queue.hello");  
              
            //消息消费者  
            MessageConsumer consumer = session.createConsumer(destination);  
            consumer.setMessageListener(this);  
              
            //等待接收消息  
            while(!stop){  
                Thread.sleep(5000);  
            }  
              
            session.commit();  
              
            consumer.close();  
            session.close();  
            conn.close();  
        }  
      
        public void onMessage(Message m) {  
            try{  
                if(m instanceof TextMessage){ //接收文本消息  
                    TextMessage message = (TextMessage)m;  
                    System.out.println(message.getText());  
                }else if(m instanceof MapMessage){ //接收键值对消息  
                    MapMessage message = (MapMessage)m;  
                    System.out.println(message.getLong("age"));  
                    System.out.println(message.getDouble("sarray"));  
                    System.out.println(message.getString("username"));  
                }else if(m instanceof StreamMessage){ //接收流消息  
                    StreamMessage message = (StreamMessage)m;  
                    System.out.println(message.readString());  
                    System.out.println(message.readLong());  
                }else if(m instanceof BytesMessage){ //接收字节消息  
                    byte[] b = new byte[1024];  
                    int len = -1;  
                    BytesMessage message = (BytesMessage)m;  
                    while((len=message.readBytes(b))!=-1){  
                        System.out.println(new String(b, 0, len));  
                    }  
                }else if(m instanceof ObjectMessage){ //接收对象消息  
                    ObjectMessage message = (ObjectMessage)m;  
                    User user = (User)message.getObject();  
                    System.out.println(user.getName() + " _ " + user.getInfo());  
                }else{  
                    System.out.println(m);  
                }  
                  
                stop = true;  
            }catch(JMSException e){  
                stop = true;  
                e.printStackTrace();  
            }  
        }  
        
        
        public static void main(String[] args) throws Exception{
        	Receiver r = new Receiver();
        	r.execute();
        	
        }
    }

5.3、实体对象User,必须实现Serializable

import java.io.Serializable;

@SuppressWarnings("serial")
public class User implements Serializable{
	private String name;
	private String info;
   
	public User(String name,String info){
		this.name = name;
		this.info = info;
	}
	
	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getInfo() {
		return info;
	}

	public void setInfo(String info) {
		this.info = info;
	}

	 
}

5.4、运行serder与receiver......同事可以登录mq后台管理界面,看到相应的消息信息

http://ip地址:8161

相关推荐