java 实现利用 RabbitMQ 发送和消费消息

1.简介

RabbitMQ 是一个消息代理。从本质上说,它从生产者接收消息,然后把这些消息传递给消费者。在这期间,它能根据你制定的规则发送,缓存,或者持久化存储这些消息。
1
2

RabbitMQ 使用到的专业术语。

1).Producing 的意思不仅仅是发送消息。发送消息的程序叫做producer。我们像下图一样描绘它。

2).Queue 是一个消息盒子的名称。它存活在 RabbitMQ 里。虽然消息流经 RabbitMQ 和你的应用程序,但是他们只能在 Queue 里才能被保存。Queue 没有任何边界的限制,你想存多少消息都可以,它本质上是一个无限的缓存。许多生产者都可以向一个 Queue 里发送消息,许多消费者都可以从一个 Queue 里接收消息。我们像下图一样描绘它。

3).Consuming 的意思和接收类似。一个消费者主要是指等待接收消息的程序。我们像下图一样描绘它。

注意:生产者,消费者和代理不一定非要在同一台机器上,在大多数应用中的确也是这样的。

2.”Hello World”

在这部分的使用指南中,我们要用 Java 写两个程序;一个是生产者,他发送一个消息,另一个是消费者,它接收消息,并且把消息打印出来。我们将会忽略一些Java API 的细节,而是将注意力主要放在我们将要做的这件事上,这件事就是发送一个 “Hello World” 消息。

在下面的图中,”P” 代表生产者,而 “C” 代表消费者。中间的就是一个 Queue,一个消息缓存区。

(P) -> [|||] -> (C)

java 客户端类库

AMQP 是一个开源的,通用的消息协议。有几个用不同语言编写的 AMQP 的客户端。我们将要使用 RabbitMQ 提供的 java 版本的客户端。

下载这个客户端 (http://www.rabbitmq.com/java-client.html) ,把它解压到你的工作目录下,并且找到jar文件。

$ unzip rabbitmq-java-client-bin-*.zip
$ cp rabbitmq-java-client-bin-*/*.jar ./
这个客户端在 Maven 的中心仓库中也有。
groupId:com.rabbitmq
artifactId:amqp-client
1
2
3
4
5
6
7
8
9

现在我们有了客户端和依赖,我们开始写代码。

Sending

(P) -> [|||]

我们把消息发送者叫 Send,消息接收者叫 Recv。消息发送者连接 RabbitMQ ,发送一个消息,然后退出。

创建 Send.java 文件,并导入我们需要的类。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
1
2
3

创建给 Queue 命名

public class Send {
 private final static String QUEUE_NAME = "hello";
 public static void main(String[] argv)
 throws java.io.IOException {
 ...
 }
}
1
2
3
4
5
6
7
8

然后我们创建一个连接。

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
1
2
3
4
5
6

到这,我们就和本机的代理建立了连接。如果我们想要连接到不同机器上的代理,我们只需要制定那台机器的域名或 IP 地址即可。

接下来,我们创建一个管道。为了发送消息,我们还需要声明一个 Queue ,然后我们就能发布消息到 Queue 上了。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
1
2
3
4

Queue 声明的过程是幂等的,只有在它不存在的情况下才会被创建出来。消息的内容是字节数组,这意味着你能够使用任何你想使用的编码。

最后,我们关闭这个管道和连接。

下面是我们完整的 Send.java 文件

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "hello";
 public static void main(String[] argv) throws Exception {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("localhost");
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 String message = "Hello World!";
 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
 System.out.println(" [x] Sent '" + message + "'");
 channel.close();
 connection.close();
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

Receiving

不像消息的发送者只是发送一个消息,我们的接收者需要不断的监听消息,并把它们打印出来。

[|||] -> (C)

在 Recv.java 这个文件中需要导入的类和 Send.java 中类似。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
1
2
3
4
5

DefaultConsumer 是一个实现了 Consumer 接口的类,我们将会用它来缓存从服务器推送过来的消息。

创建 Recv.java 文件,打开一个连接和一个管道,并且声明一个我们将要消费消息的 Queue。注意:这个 Queue 的名字要和 Send.java 文件中的一致。

public class Recv {
 private final static String QUEUE_NAME = "hello";
 public static void main(String[] argv)
 throws java.io.IOException,
 java.lang.InterruptedException {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("localhost");
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 ...
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

注意到,我们在这又声明了一个 Queue,这是因为我们可能在运行 Send.java 文件之前运行 Recv.java 文件,所以我们要确保在从 Queue 消费消息之前,

它是已经存在的。

我们告诉服务器从 Queue 中向我们传递消息,由于它是以异步的方式向我们传递消息,所以我们采用从消息的缓存对象回调的方式,直到我们已经消费了这些消

息。这就是 DefaultConsumer 子类所做的事。

Consumer consumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
 throws IOException {
 String message = new String(body, "UTF-8");
 System.out.println(" [x] Received '" + message + "'");
 }
 };
 channel.basicConsume(QUEUE_NAME, true, consumer);
1
2
3
4
5
6
7
8
9
10

下面是完整的 Recv.java 文件。

import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("localhost");
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 Consumer consumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
 throws IOException {
 String message = new String(body, "UTF-8");
 System.out.println(" [x] Received '" + message + "'");
 }
 };
 channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
Putting it all together
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

编译文件:

$ javac -cp rabbitmq-client.jar Send.java Recv.java
运行时,我们需要 rabbitmq-client.jar,打开终端,运行 Send 文件:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send
然后运行 Recv 文件:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv
1
2
3
4
5
6
7
8
9

消息的接收者将会打印出发送者发送的消息,接收者将保持运行的状态,并且等待消息,使用 Ctrl+C 可以结束运行。因此,尽可能从另一个终端运行发送者程序。

java 实现利用 RabbitMQ 发送和消费消息

相关推荐