ActiveMQ入门实例

  ActiveMQ入门实例

在这里利用MQ作为消息中间件来发送点对点(PTP)的文本消息,以便对JMS有入门级的认识,需要下载activemq并进入${active-mq}\bin目录运行activemq.bat。需要activemq-all对应版本的jar和jms.jar。

消息发送方QSender通过键盘输入文本信息来发送消息,接收方QReceiver实现MessageListener接口异步接受消息并打印。代码如下

发送方

importjava.io.BufferedReader;

importjava.io.InputStreamReader;

importjavax.jms.Queue;

importjavax.jms.QueueConnection;

importjavax.jms.QueueConnectionFactory;

importjavax.jms.QueueSender;

importjavax.jms.QueueSession;

importjavax.jms.Session;

importjavax.jms.TextMessage;

importorg.apache.activemq.ActiveMQConnectionFactory;

importorg.apache.activemq.command.ActiveMQQueue;

publicclassQSender

{

publicstaticvoidmain(String[]args)

{

newQSender().send();

}

publicvoidsend()

{

BufferedReaderbr=newBufferedReader(newInputStreamReader(System.in));

try

{

//initconnectionfactorywithactivemq

QueueConnectionFactoryfactory=newActiveMQConnectionFactory("tcp://127.0.0.1:61616");

//specifythedestination

Queuequeue=newActiveMQQueue("queue.name.sample");

//createconnection,session,produceranddelivermessage

QueueConnectionconn=factory.createQueueConnection();

QueueSessionsession=conn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);

QueueSendersender=session.createSender(queue);

StringmsgText=null;

while(true)

{

System.out.println("entermessagetosendor'quit'");

msgText=br.readLine();

if("quit".equals(msgText))

break;

TextMessagemsg=session.createTextMessage(msgText);

sender.send(msg);

}

System.out.println("exiting..");

//closeopenedresources

br.close();

conn.close();

}

catch(Exceptione)

{

e.printStackTrace();

System.exit(1);

}

}

}

接收方

importjavax.jms.JMSException;

importjavax.jms.Message;

importjavax.jms.MessageListener;

importjavax.jms.Queue;

importjavax.jms.QueueConnection;

importjavax.jms.QueueReceiver;

importjavax.jms.QueueSession;

importjavax.jms.Session;

importjavax.jms.TextMessage;

importorg.apache.activemq.ActiveMQConnectionFactory;

importorg.apache.activemq.command.ActiveMQQueue;

//implementsMessageListenerinordertoreceivingasynchronousmessage

publicclassQReceiverimplementsMessageListener

{

//indicatewhetherstopingreceiving

privatebooleanstop=false;

publicstaticvoidmain(String[]args)

{

newQReceiver().receive();

}

publicvoidreceive()

{

try

{

//thesameasQSender

QueueConnectionconn=newActiveMQConnectionFactory("tcp://127.0.0.1:61616").createQueueConnection();

Queuequeue=newActiveMQQueue("queue.name.sample");

QueueSessionsession=conn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);

QueueReceiverreceiver=session.createReceiver(queue);

//Setsthemessageconsumer'sMessageListener.

receiver.setMessageListener(this);

//readytosetupconsumer,begintoreceivemessage

conn.start();

while(!stop)

{

Thread.sleep(1000);

}

System.out.println("exiting..");

conn.close();

}

catch(Exceptione)

{

e.printStackTrace();

System.exit(1);

}

}

//calledwhilereceivingmessage

@Override

publicvoidonMessage(Messagemsg)

{

try

{

Stringtext=((TextMessage)msg).getText();

System.out.println(text);

if(text.equals("stop"))

stop=true;

}

catch(JMSExceptione)

{

e.printStackTrace();

stop=true;

}

相关推荐