activemq queue搭建
其他都一样,只记录不一样的地方
context.xml中jndi的部分,
<Resource name="jms/queue/MyQueue"
auth="Container"
type="org.apache.activemq.command.ActiveMQQueue"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
physicalname="MY.TEST.FOO.QUEUE"/>
发送消息,通过servlet
InitialContext initCtx = new InitialContext(); Context envContext = (Context) initCtx.lookup("java:comp/env"); ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/ConnectionFactory"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer((Destination) envContext.lookup("jms/queue/MyQueue")); Message testMessage = session.createMessage(); testMessage.setStringProperty("testKey", "testValue111"); producer.send(testMessage);
接受消息通过main方法
package com.activemqtest; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Reciever2 { protected Queue queue; protected String queueName = "MY.TEST.FOO.QUEUE"; protected String url = "tcp://192.168.91.128:61616"; protected int ackMode = Session.AUTO_ACKNOWLEDGE; public static void main(String[] args) { Reciever2 rec = new Reciever2(); try { rec.run(); } catch (Exception e) { e.printStackTrace(); } } public void run() throws JMSException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url); QueueConnection connection = (QueueConnection) connectionFactory .createTopicConnection(); connection.start(); MessageConsumer consumer = null; Session session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); queue = session.createQueue(queueName); consumer = session.createConsumer(queue); System.out.println(" Waiting for message (max 3) "); for (int i = 0; i < 3; i++) { Message message = consumer.receive(); processMessage(message); } System.out.println("Closing connection"); consumer.close(); session.close(); connection.close(); } public void processMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Received: " + text); } else { System.out.println(message.getStringProperty("testKey")); } } catch (Exception e) { e.printStackTrace(); } } }
consumer可以不online,producer发送消息后,consumer再启动也可以收到消息
相关推荐
胡献根 2020-07-18
胡献根 2020-07-05
jiangtie 2020-06-10
onlylixiaobei 2020-06-09
xinglun 2020-06-02
方新德 2020-05-31
Java高知 2020-05-20
Java高知 2020-05-08
Java高知 2020-05-03
onlylixiaobei 2020-05-02
Java高知 2020-04-22
胡献根 2020-04-22
heweiyabeijing 2020-04-21
方新德 2020-04-20
胡献根 2020-04-10
onlylixiaobei 2020-04-10
方新德 2020-04-08
xuedabao 2020-03-30