ActiveMQ入门实例
ActiveMQ入门实例
在这里利用MQ作为消息中间件来发送点对点(PTP)的文本消息,以便对JMS有入门级的认识,需要下载activemq并进入${active-mq}\bin目录运行activemq.bat。需要activemq-all对应版本的jar和jms.jar。
消息发送方QSender通过键盘输入文本信息来发送消息,接收方QReceiver实现MessageListener接口异步接受消息并打印。代码如下
发送方
importjava.io.BufferedReader;
importjava.io.InputStreamReader;
importjavax.jms.Queue;
importjavax.jms.QueueConnection;
importjavax.jms.QueueConnectionFactory;
importjavax.jms.QueueSender;
importjavax.jms.QueueSession;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnectionFactory;
importorg.apache.activemq.command.ActiveMQQueue;
publicclassQSender
{
publicstaticvoidmain(String[]args)
{
newQSender().send();
}
publicvoidsend()
{
BufferedReaderbr=newBufferedReader(newInputStreamReader(System.in));
try
{
//initconnectionfactorywithactivemq
QueueConnectionFactoryfactory=newActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//specifythedestination
Queuequeue=newActiveMQQueue("queue.name.sample");
//createconnection,session,produceranddelivermessage
QueueConnectionconn=factory.createQueueConnection();
QueueSessionsession=conn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
QueueSendersender=session.createSender(queue);
StringmsgText=null;
while(true)
{
System.out.println("entermessagetosendor'quit'");
msgText=br.readLine();
if("quit".equals(msgText))
break;
TextMessagemsg=session.createTextMessage(msgText);
sender.send(msg);
}
System.out.println("exiting..");
//closeopenedresources
br.close();
conn.close();
}
catch(Exceptione)
{
e.printStackTrace();
System.exit(1);
}
}
}
接收方
importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageListener;
importjavax.jms.Queue;
importjavax.jms.QueueConnection;
importjavax.jms.QueueReceiver;
importjavax.jms.QueueSession;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnectionFactory;
importorg.apache.activemq.command.ActiveMQQueue;
//implementsMessageListenerinordertoreceivingasynchronousmessage
publicclassQReceiverimplementsMessageListener
{
//indicatewhetherstopingreceiving
privatebooleanstop=false;
publicstaticvoidmain(String[]args)
{
newQReceiver().receive();
}
publicvoidreceive()
{
try
{
//thesameasQSender
QueueConnectionconn=newActiveMQConnectionFactory("tcp://127.0.0.1:61616").createQueueConnection();
Queuequeue=newActiveMQQueue("queue.name.sample");
QueueSessionsession=conn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
QueueReceiverreceiver=session.createReceiver(queue);
//Setsthemessageconsumer'sMessageListener.
receiver.setMessageListener(this);
//readytosetupconsumer,begintoreceivemessage
conn.start();
while(!stop)
{
Thread.sleep(1000);
}
System.out.println("exiting..");
conn.close();
}
catch(Exceptione)
{
e.printStackTrace();
System.exit(1);
}
}
//calledwhilereceivingmessage
@Override
publicvoidonMessage(Messagemsg)
{
try
{
Stringtext=((TextMessage)msg).getText();
System.out.println(text);
if(text.equals("stop"))
stop=true;
}
catch(JMSExceptione)
{
e.printStackTrace();
stop=true;
}