RabbitMQ(5)Java Client - Publish/Subscribe

RabbitMQ(5)JavaClient-Publish/Subscribe

Wewilldeliverthemessagestomultipleconsumers.Thispatternisknownas"publish/subscribe".

Exchanges

TheProducerwillnotdirectlycontacttothequeues.Wewillhaveaexchangebetweenthem.Andtheexchangemustknowexactlywhattodowithamessageitreceives.Shoulditbeappendedtoaparticularqueue?Shoulditbeappendedtomanyqueues?Orshoulditgetdiscarded.Therulesforthataredefinedbytheexchangetype.

Therearefewexchangetypesavailable:direct,topic,headersandfanout.

channel.exchangeDeclare("logs","fanout");

Fanoutjustbroadcastsallthemessagesitreceivestoallthequeuesitknows.

Wecanusethiscommandtoseealltheexchangesrunningonourserver

>sudosbin/rabbitmqctllist_exchanges

Namelessexchange

Before,wesendourmessageusingadefaultexchange,whichweidentifybytheemptystring("").

channel.basicPublish("","hello",null,message.getBytes());

Temporaryqueues

Beforewehave"hello"and"task_queue".Beingabletonameaqueuewascrucialforus.Givingaqueueanameisimportantwhenyouwanttosharethequeuebetweenproducersandconsumers.

IntheJavaclient,whenwesupplynoparameterstoqueueDeclare()wecreateanon-durable,exclusive,autodeletequeuewithageneratedname:

StringqueueName=channel.queueDeclare().getQueue();

Bindings

Therelationshipbetweenexchangeandaqueueiscalledabinding.

channel.queueBind(queueName,"logs","");

>sudosbin/rabbitmqctllist_bindings

Puttingitalltogether

Emitthemessage

packagecom.sillycat.easytalker.rabbitmq.publish;

importcom.rabbitmq.client.Channel;

importcom.rabbitmq.client.Connection;

importcom.rabbitmq.client.ConnectionFactory;

publicclassEmitLog{

privatestaticfinalStringEXCHANGE_name="logs";

privatefinalstaticStringSERVER_HOST="localhost";

publicstaticvoidmain(String[]argv)throwsException{

ConnectionFactoryfactory=newConnectionFactory();

factory.setHost(SERVER_HOST);

Connectionconnection=factory.newConnection();

Channelchannel=connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

Stringmessage="errordebugyinformwarninglog!";

channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());

System.out.println("[x]Sent'"+message+"'");

channel.close();

connection.close();

}

}

Receivingthelogmessages

packagecom.sillycat.easytalker.rabbitmq.publish;

importcom.rabbitmq.client.Channel;

importcom.rabbitmq.client.Connection;

importcom.rabbitmq.client.ConnectionFactory;

importcom.rabbitmq.client.QueueingConsumer;

publicclassReceiveLogs{

privatestaticfinalStringEXCHANGE_name="logs";

privatefinalstaticStringSERVER_HOST="localhost";

publicstaticvoidmain(String[]argv)throwsException{

ConnectionFactoryfactory=newConnectionFactory();

factory.setHost(SERVER_HOST);

Connectionconnection=factory.newConnection();

Channelchannel=connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

StringqueueName=channel.queueDeclare().getQueue();

channel.queueBind(queueName,EXCHANGE_NAME,"");

System.out.println("

Waitingformessages.ToexitpressCTRL+C");QueueingConsumerconsumer=newQueueingConsumer(channel);

channel.basicConsume(queueName,true,consumer);

while(true){

QueueingConsumer.Deliverydelivery=consumer.nextDelivery();

Stringmessage=newString(delivery.getBody());

System.out.println("[x]Received'"+message+"'");

}

}

}

Queuebelongstotheconsumer,bindingwiththeexchange.

references:

http://www.rabbitmq.com/tutorials/tutorial-three-java.html

相关推荐