java 实现利用 RabbitMQ 发送和消费消息
1.简介
RabbitMQ 是一个消息代理。从本质上说,它从生产者接收消息,然后把这些消息传递给消费者。在这期间,它能根据你制定的规则发送,缓存,或者持久化存储这些消息。 1 2
RabbitMQ 使用到的专业术语。
1).Producing 的意思不仅仅是发送消息。发送消息的程序叫做producer。我们像下图一样描绘它。
2).Queue 是一个消息盒子的名称。它存活在 RabbitMQ 里。虽然消息流经 RabbitMQ 和你的应用程序,但是他们只能在 Queue 里才能被保存。Queue 没有任何边界的限制,你想存多少消息都可以,它本质上是一个无限的缓存。许多生产者都可以向一个 Queue 里发送消息,许多消费者都可以从一个 Queue 里接收消息。我们像下图一样描绘它。
3).Consuming 的意思和接收类似。一个消费者主要是指等待接收消息的程序。我们像下图一样描绘它。
注意:生产者,消费者和代理不一定非要在同一台机器上,在大多数应用中的确也是这样的。
2.”Hello World”
在这部分的使用指南中,我们要用 Java 写两个程序;一个是生产者,他发送一个消息,另一个是消费者,它接收消息,并且把消息打印出来。我们将会忽略一些Java API 的细节,而是将注意力主要放在我们将要做的这件事上,这件事就是发送一个 “Hello World” 消息。
在下面的图中,”P” 代表生产者,而 “C” 代表消费者。中间的就是一个 Queue,一个消息缓存区。
(P) -> [|||] -> (C)
java 客户端类库
AMQP 是一个开源的,通用的消息协议。有几个用不同语言编写的 AMQP 的客户端。我们将要使用 RabbitMQ 提供的 java 版本的客户端。
下载这个客户端 (http://www.rabbitmq.com/java-client.html) ,把它解压到你的工作目录下,并且找到jar文件。
$ unzip rabbitmq-java-client-bin-*.zip $ cp rabbitmq-java-client-bin-*/*.jar ./ 这个客户端在 Maven 的中心仓库中也有。 groupId:com.rabbitmq artifactId:amqp-client 1 2 3 4 5 6 7 8 9
现在我们有了客户端和依赖,我们开始写代码。
Sending
(P) -> [|||]
我们把消息发送者叫 Send,消息接收者叫 Recv。消息发送者连接 RabbitMQ ,发送一个消息,然后退出。
创建 Send.java 文件,并导入我们需要的类。
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; 1 2 3
创建给 Queue 命名
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { ... } } 1 2 3 4 5 6 7 8
然后我们创建一个连接。
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); 1 2 3 4 5 6
到这,我们就和本机的代理建立了连接。如果我们想要连接到不同机器上的代理,我们只需要制定那台机器的域名或 IP 地址即可。
接下来,我们创建一个管道。为了发送消息,我们还需要声明一个 Queue ,然后我们就能发布消息到 Queue 上了。
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); 1 2 3 4
Queue 声明的过程是幂等的,只有在它不存在的情况下才会被创建出来。消息的内容是字节数组,这意味着你能够使用任何你想使用的编码。
最后,我们关闭这个管道和连接。
下面是我们完整的 Send.java 文件
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
Receiving
不像消息的发送者只是发送一个消息,我们的接收者需要不断的监听消息,并把它们打印出来。
[|||] -> (C)
在 Recv.java 这个文件中需要导入的类和 Send.java 中类似。
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; 1 2 3 4 5
DefaultConsumer 是一个实现了 Consumer 接口的类,我们将会用它来缓存从服务器推送过来的消息。
创建 Recv.java 文件,打开一个连接和一个管道,并且声明一个我们将要消费消息的 Queue。注意:这个 Queue 的名字要和 Send.java 文件中的一致。
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ... } } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
注意到,我们在这又声明了一个 Queue,这是因为我们可能在运行 Send.java 文件之前运行 Recv.java 文件,所以我们要确保在从 Queue 消费消息之前,
它是已经存在的。
我们告诉服务器从 Queue 中向我们传递消息,由于它是以异步的方式向我们传递消息,所以我们采用从消息的缓存对象回调的方式,直到我们已经消费了这些消
息。这就是 DefaultConsumer 子类所做的事。
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); 1 2 3 4 5 6 7 8 9 10
下面是完整的 Recv.java 文件。
import com.rabbitmq.client.*; import java.io.IOException; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } } Putting it all together 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
编译文件:
$ javac -cp rabbitmq-client.jar Send.java Recv.java 运行时,我们需要 rabbitmq-client.jar,打开终端,运行 Send 文件: $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send 然后运行 Recv 文件: $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv 1 2 3 4 5 6 7 8 9
消息的接收者将会打印出发送者发送的消息,接收者将保持运行的状态,并且等待消息,使用 Ctrl+C 可以结束运行。因此,尽可能从另一个终端运行发送者程序。