rabbitmq延迟消息示例

官方插件仅支持>=3.6.x 版本中支持。

本文描述的消息延迟机制采用官方推荐的插件rabbitmq-delayed-message-exchange,如精通rabbitmq和编程,请自行查看官方文档,描述更加详尽:

安装

需要在集群每台机器中安装

由于rabbitmq并未内置该插件,需要手动下载安装。关于已安装的插件通过rabbitmq-plugins list可查看.

加载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/lib/rabbitmq/plugins中(windows和其他系统<安装目录>\rabbitmq_server-version\plugins).

启用插件

需要在集群每台机器中执行

通过rabbitmq-plugins list查看已安装列表,如下:

...
[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x
...

使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang启用插件,输出如下:

The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

通过rabbitmq-plugins list查看已安装列表,如下:

...
[E*] rabbitmq_delayed_message_exchange 20171215-3.6.x
...

机制

安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

Java使用过程

声明x-delayed-message消息交换机

  • rabbitmq java client实现
// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...
  • spring rabbitmq template实现
// ... elided code ...
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
Exchange exchange = new CustomExchange("test.exchange", "x-delayed-message", true, false, args);
//admin = RabbitmqAdmin
admin.declareExchange(exchange);

//more code...

消息发送

  • rabbitmq java client实现
// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("test.exchange", "test", props.build(), messageBodyBytes);
// ... more code ...
  • spring rabbitmq template实现
MessageProperties properties = new MessageProperties();
properties.setHeader("x-delay", 1000);
//template : RabbitmqTemplate
template.convertAndSend("test.exchange", "test", new Message(body, properties));

相关推荐