RabbitMQ(5)Java Client - Publish/Subscribe
RabbitMQ(5)JavaClient-Publish/Subscribe
Wewilldeliverthemessagestomultipleconsumers.Thispatternisknownas"publish/subscribe".
Exchanges
TheProducerwillnotdirectlycontacttothequeues.Wewillhaveaexchangebetweenthem.Andtheexchangemustknowexactlywhattodowithamessageitreceives.Shoulditbeappendedtoaparticularqueue?Shoulditbeappendedtomanyqueues?Orshoulditgetdiscarded.Therulesforthataredefinedbytheexchangetype.
Therearefewexchangetypesavailable:direct,topic,headersandfanout.
channel.exchangeDeclare("logs","fanout");
Fanoutjustbroadcastsallthemessagesitreceivestoallthequeuesitknows.
Wecanusethiscommandtoseealltheexchangesrunningonourserver
>sudosbin/rabbitmqctllist_exchanges
Namelessexchange
Before,wesendourmessageusingadefaultexchange,whichweidentifybytheemptystring("").
channel.basicPublish("","hello",null,message.getBytes());
Temporaryqueues
Beforewehave"hello"and"task_queue".Beingabletonameaqueuewascrucialforus.Givingaqueueanameisimportantwhenyouwanttosharethequeuebetweenproducersandconsumers.
IntheJavaclient,whenwesupplynoparameterstoqueueDeclare()wecreateanon-durable,exclusive,autodeletequeuewithageneratedname:
StringqueueName=channel.queueDeclare().getQueue();
Bindings
Therelationshipbetweenexchangeandaqueueiscalledabinding.
channel.queueBind(queueName,"logs","");
>sudosbin/rabbitmqctllist_bindings
Puttingitalltogether
Emitthemessage
packagecom.sillycat.easytalker.rabbitmq.publish;
importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
publicclassEmitLog{
privatestaticfinalStringEXCHANGE_name="logs";
privatefinalstaticStringSERVER_HOST="localhost";
publicstaticvoidmain(String[]argv)throwsException{
ConnectionFactoryfactory=newConnectionFactory();
factory.setHost(SERVER_HOST);
Connectionconnection=factory.newConnection();
Channelchannel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Stringmessage="errordebugyinformwarninglog!";
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("[x]Sent'"+message+"'");
channel.close();
connection.close();
}
}
Receivingthelogmessages
packagecom.sillycat.easytalker.rabbitmq.publish;
importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
importcom.rabbitmq.client.QueueingConsumer;
publicclassReceiveLogs{
privatestaticfinalStringEXCHANGE_name="logs";
privatefinalstaticStringSERVER_HOST="localhost";
publicstaticvoidmain(String[]argv)throwsException{
ConnectionFactoryfactory=newConnectionFactory();
factory.setHost(SERVER_HOST);
Connectionconnection=factory.newConnection();
Channelchannel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
StringqueueName=channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("
Waitingformessages.ToexitpressCTRL+C");QueueingConsumerconsumer=newQueueingConsumer(channel);channel.basicConsume(queueName,true,consumer);
while(true){
QueueingConsumer.Deliverydelivery=consumer.nextDelivery();
Stringmessage=newString(delivery.getBody());
System.out.println("[x]Received'"+message+"'");
}
}
}
Queuebelongstotheconsumer,bindingwiththeexchange.
references:
http://www.rabbitmq.com/tutorials/tutorial-three-java.html