activemq入门实例
一,下载activemq
http://mirror.bit.edu.cn/apache/activemq/5.12.1/apache-activemq-5.12.1-bin.tar.gz
二,启动activemq
./activemq start &
三,开启activemq端口
1,127.0.0.1:8161 2,127.0.0.1:61616
四,创建Queue
127.0.0.1:8161/admin
五,Sender.java
package com.my.common.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /********************************************************************************* //* Copyright (C) 2015 MAKUN. All Rights Reserved. //* //* Filename: Sender.java //* Revision: 1.0 //* Author: <makun> //* Created On: 2015年11月1日 //* Modified by: //* Modified On: //* //* Description: <description> /********************************************************************************/ public class Sender { private static final int SEND_NUMBER = 5; public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // MessageProducer:消息发送者 MessageProducer producer; // TextMessage message; // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://120.27.44.131:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值activemqTest是一个服务器的queue,须在在ActiveMq的console配置(http://120.27.44.131:8161/admin/) destination = session.createQueue("activemqTest"); // 得到消息生成者【发送者】 producer = session.createProducer(destination); // 设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 构造消息,此处写死,项目就是参数,或者方法获取 sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 1 ; i <= SEND_NUMBER ; i++) { TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i); // 发送消息到目的地方 System.out.println("发送消息:" + "ActiveMq 发送的消息" + i); producer.send(message); } } }
六,Receiver.java
package com.my.common.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /********************************************************************************* //* Copyright (C) 2015 MAKUN. All Rights Reserved. //* //* Filename: Receiver.java //* Revision: 1.0 //* Author: <makun> //* Created On: 2015年11月1日 //* Modified by: //* Modified On: //* //* Description: <description> /********************************************************************************/ public class Receiver { public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://120.27.44.131:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值activemqTest是一个服务器的queue,须在在ActiveMq的console配置(http://120.27.44.131:8161/admin/) destination = session.createQueue("activemqTest"); consumer = session.createConsumer(destination); while (true) { //设置接收者接收消息的时间,为了便于测试,这里谁定为100s TextMessage message = (TextMessage) consumer.receive(5000000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }
七,测试过程
将receiver的接收时间设置长一些,先启动receiver,然后没有收到任何消息,当sender一启动,sender的控制台有如下消息
发送消息:ActiveMq 发送的消息1 发送消息:ActiveMq 发送的消息2 发送消息:ActiveMq 发送的消息3 发送消息:ActiveMq 发送的消息4 发送消息:ActiveMq 发送的消息5
receiver的控制台如下
收到消息ActiveMq 发送的消息1 收到消息ActiveMq 发送的消息2 收到消息ActiveMq 发送的消息3 收到消息ActiveMq 发送的消息4 收到消息ActiveMq 发送的消息5
八,Queue与Topic的比较
1、JMSQueue执行loadbalancer语义:
一条消息仅能被一个consumer(消费者)收到。如果在message发送的时候没有可用的
consumer,那么它将被保存一直到能处理该message的consumer可用。如果一
个consumer收到一条message后却不响应它,那么这条消息将被转到另一个
consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer
中负载均衡。
注:
点对点消息传递域的特点如下: • 每个消息只能有一个消费者。 • 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发 送消息的时候是否处于运行状态,它都可以提取消息。
2、Topic实现publish和subscribe语义:
一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber
将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的
subscriber能够获得消息的一个拷贝。
注:
发布/订阅消息传递域的特点如下: • 每个消息可以有多个消费者。 • 生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费 自它订阅之后发布的消息。JMS 规范允许客户创建持久订阅,这在一定程 度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激 活状态时发送的消息。
3、分别对应两种消息模式:
Point-to-Point(点对点),Publisher/SubscriberModel(发布/订阅者)其中在Publicher/Subscriber模式下又有Nondurablesubscription(非持久订阅)
和durablesubscription(持久化订阅)2种消息处理方式(支持离线消息)。
注:
在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递 域中,目的地被成为主题(topic)。
注:本文摘自作者:xwdreamer出处:http://www.cnblogs.com/xwdreamer