JMS-activeMQ
使用场景:
1.通过JMSfacade更新数据库,完全解耦了客户端和服务器端处理过程。
2.与遗留老式系统接口
3.应付大访问量交易系统
4.协同多系统之间处理效率
5、数据同步等
ActiveMQ是Apache出品,是目前最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMSProvider实现。
特性及优势
1 、实现 JMS1.1 规范,支持 J2EE1.4 以上 2 、可运行于任何 jvm 和大部分 web 容器( ActiveMQ works great in any JVM ) 3 、支持多种语言客户端( java, C, C++, AJAX, ACTIONSCRIPT 等等) 4 、支持多种协议( stomp , openwire , REST ) 5 、良好的 spring 支持( ActiveMQ has great Spring Support ) 6 、速度很快, JBossMQ 的十倍( ActiveMQ is very fast; often 10x faster than JBossMQ. ) 7 、与 OpenJMS 、 JbossMQ 等开源 jms provider 相比, ActiveMQ 有 Apache 的支持,持续发展的优势明显。
2、ActiveMQ中的两种消息处理模式
Queue 与 Topic 的比较 1 、 JMS Queue 执行 load balancer 语义: 一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。 2 、 Topic 实现 publish 和 subscribe 语义: 一条消息被 publish 时,它将发到所有感兴趣的订阅者,所以零到多个 subscriber 将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的 subscriber 能够获得消息的一个拷贝。 3 、分别对应两种消息模式: Point-to-Point ( 点对点 ),Publisher/Subscriber Model ( 发布 / 订阅者 ) 其中在 Publicher/Subscriber 模式下又有 Nondurable subscription (非持久订阅)和 durable subscription ( 持久化订阅 )2 种消息处理方式。
接下来开始MQ的实例
1.下载ActiveMQ
去官方网站下载:http://activemq.apache.org/
2.运行ActiveMQ
解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。
3、配置conf\activemq.xmlweb管理界面启动IP和端口
<transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616"/> </transportConnectors>
4、配置conf\jetty.xml通讯端口
<property name="connectors"> <list> <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"> <property name="port" value="8161" /> </bean> </list> </property> 注:一般保持默认 不修改8161端口
5、在eclipse中新建项目,将apache-activemq-5.5.1\lib中所需要的jar包复制到项目中.
5.1ClassSender
import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { public static void main(String[] args) throws JMSException{ ConnectionFactory connFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); //连接到JMS提供者 Connection conn = connFactory.createConnection(); conn.start(); //事务性会话,自动确认消息 Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); //消息的目的地 Destination destination = session.createQueue("queue.hello"); //消息生产者 MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //不持久化 //发送消息 sendText(session, producer); session.commit(); //在事务性会话中,只有commit之后,消息才会真正到达目的地 producer.close(); session.close(); conn.close(); } //对象消息 public static void sendObject(Session session,MessageProducer producer) throws JMSException{ User user = new User("cjm", "对象消息"); //User对象必须实现Serializable接口 ObjectMessage objectMessage = session.createObjectMessage(); objectMessage.setObject(user); producer.send(objectMessage); } //字节消息 public static void sendBytes(Session session,MessageProducer producer) throws JMSException{ String s = "BytesMessage字节消息"; BytesMessage bytesMessage = session.createBytesMessage(); bytesMessage.writeBytes(s.getBytes()); producer.send(bytesMessage); } //流消息 public static void setStream(Session session,MessageProducer producer)throws JMSException{ StreamMessage streamMessage = session.createStreamMessage(); streamMessage.writeString("streamMessage流消息"); streamMessage.writeLong(55); producer.send(streamMessage); } //键值对消息 public static void sendMap(Session session,MessageProducer producer)throws JMSException{ MapMessage mapMessage = session.createMapMessage(); mapMessage.setLong("age", new Long(32)); mapMessage.setDouble("sarray", new Double(5867.15)); mapMessage.setString("username", "键值对消息"); producer.send(mapMessage); } //文本消息 public static void sendText(Session session,MessageProducer producer)throws JMSException{ TextMessage textMessage = session.createTextMessage("文本消息"); producer.send(textMessage); } }
5.2ClassReceiver
import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver implements MessageListener{ private boolean stop = false; public void execute() throws Exception { //连接工厂 ConnectionFactory connFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); //连接到JMS提供者 Connection conn = connFactory.createConnection(); conn.start(); //事务性会话,自动确认消息 Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); //消息的来源地 Destination destination = session.createQueue("queue.hello"); //消息消费者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(this); //等待接收消息 while(!stop){ Thread.sleep(5000); } session.commit(); consumer.close(); session.close(); conn.close(); } public void onMessage(Message m) { try{ if(m instanceof TextMessage){ //接收文本消息 TextMessage message = (TextMessage)m; System.out.println(message.getText()); }else if(m instanceof MapMessage){ //接收键值对消息 MapMessage message = (MapMessage)m; System.out.println(message.getLong("age")); System.out.println(message.getDouble("sarray")); System.out.println(message.getString("username")); }else if(m instanceof StreamMessage){ //接收流消息 StreamMessage message = (StreamMessage)m; System.out.println(message.readString()); System.out.println(message.readLong()); }else if(m instanceof BytesMessage){ //接收字节消息 byte[] b = new byte[1024]; int len = -1; BytesMessage message = (BytesMessage)m; while((len=message.readBytes(b))!=-1){ System.out.println(new String(b, 0, len)); } }else if(m instanceof ObjectMessage){ //接收对象消息 ObjectMessage message = (ObjectMessage)m; User user = (User)message.getObject(); System.out.println(user.getName() + " _ " + user.getInfo()); }else{ System.out.println(m); } stop = true; }catch(JMSException e){ stop = true; e.printStackTrace(); } } public static void main(String[] args) throws Exception{ Receiver r = new Receiver(); r.execute(); } }
5.3、实体对象User,必须实现Serializable
import java.io.Serializable; @SuppressWarnings("serial") public class User implements Serializable{ private String name; private String info; public User(String name,String info){ this.name = name; this.info = info; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } }
5.4、运行serder与receiver......同事可以登录mq后台管理界面,看到相应的消息信息
http://ip地址:8161
相关推荐
胡献根 2020-07-18
胡献根 2020-07-05
jiangtie 2020-06-10
onlylixiaobei 2020-06-09
xinglun 2020-06-02
方新德 2020-05-31
Java高知 2020-05-20
Java高知 2020-05-08
Java高知 2020-05-03
onlylixiaobei 2020-05-02
Java高知 2020-04-22
胡献根 2020-04-22
heweiyabeijing 2020-04-21
方新德 2020-04-20
胡献根 2020-04-10
onlylixiaobei 2020-04-10
方新德 2020-04-08
xuedabao 2020-03-30