Node.js连接RabbitMQ,断线重连,动态绑定routing key
RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...,是基于回调的。
下面将给出基于Promise式的写法。并且实现动态的队列绑定
初始化配置
const amqp = require('amqplib') // rabbitMQ地址 const {amqpAddrHost} = require('../config/index.js') // 交换机名称 const ex = 'amq.topic' const amqpAddr = `amqp://${amqpAddrHost}` // 读取HOSTNAME, 在跑多实例时,例如在k8s中,HOSTNAME可以获取当前pod的名称 // 多实例时,写日志,或者建立连接时,最好带上pod名称,如果出现问题,也比较好定位哪个pod出现的问题。 const hostName = process.env.HOSTNAME // 队列的属性设置 // 一般来说,最好设置队列自动删除autoDelete,当链接断开时,队列也会删除,这样不会产生非常多的无用队列 // durable是用来的持久化的,最好也可以设置成不持久化 const queueAttr = {autoDelete: true, durable: false} // 定义channel的引用,当链接建立时,所有方法都可以通过引用CH来获取channel方法 let CH = null
向队列发送消息的函数
// 向队列发送消息的函数 function publishMessage (msg) { if (!CH) { return '' } msg = JSON.stringify(msg) // 指定交换机ex, routing key, 以及消息的内容 CH.publish(ex, eventBusTopic, Buffer.from(msg)) }
当链接rabbitMQ断开时,要主动去重连
function reconnectRabbitMq () { log.info('reconnect_rabbit_mq') connectRabbitMq() }
连接rabbitMQ的主要函数
function connectRabbitMq () { amqp.connect(amqpAddr, { // 设置connection_name的属性,可以在rabbitMQ的控制台的UI上,看到连接是来自哪个实例 clientProperties: { connection_name: hostName } }) .then((conn) => { log.info('rabbitmq_connect_successd') // 一定要加上链接的报错事件处理,否则一旦报error错,如果不处理这个错误,程序就会崩溃 // error是个特别的事件,务必要处理的 // 报错就直接去重连 conn.on('error', (err) => { log.error('connect_error ' + err.message, err) reconnectRabbitMq() }) // 创建channel return conn.createChannel() }) .then((ch) => { CH = ch // 初始化交换机 ch.assertExchange(ex, 'topic', {durable: true}) // 初始化一个队列,队列名就用hostName, 比较容易从对列名上知道是哪个实例创建的队列 return ch.assertQueue(hostName, queueAttr) }) .then((q) => { // 可以在队列初始化完毕就立即绑定routing key, 也可以暂时不绑定,后续动态的绑定 // CH.bindQueue(q.queue, ex, 'some.topic.aaa') // 消费者,获取消息 CH.consume(q.queue, (msg) => { var _msg = msg.content.toString() var MSG = JSON.parse(_msg) log.info(_msg, MSG) }, {noAck: true}) }) .catch((err) => { console.log(err) }) }
动态给队列绑定或者解绑routing key
function toggleBindQueue (routingKey, bind) { return new Promise((resolve, reject) => { if (!CH) { log.error('channel not established') reject(new Error('channel not established')) return '' } // 初始化队列,如果队列已经存在,就会直接使用 CH.assertQueue(`${hostName}`, queueAttr) .then((q) => { // 如果bind是true,就绑定。否则就解绑 if (bind) { log.info(`bindQueue ${hostName} ${topic}`) return CH.bindQueue(q.queue, ex, topic) } else { return CH.unbindQueue(q.queue, ex, topic) } }) .then((res) => { resolve() }) .catch((err) => { reject(err) log.error(err) }) }) } module.exports = { connectRabbitMq, toggleBindQueue, publishMessage }
使用方法
加入你的服务端用的是Express, 那么在app.js中可以
... const {connectRabbitMq} = require('./connect-mq.js') connectRabbitMq() ...
完整代码
// onnect-mq.js const amqp = require('amqplib') // rabbitMQ地址 const {amqpAddrHost} = require('../config/index.js') // 交换机名称 const ex = 'amq.topic' const amqpAddr = `amqp://${amqpAddrHost}` // 读取HOSTNAME, 在跑多实例时,例如在k8s中,HOSTNAME可以获取当前pod的名称 // 多实例时,写日志,或者建立连接时,最好带上pod名称,如果出现问题,也比较好定位哪个pod出现的问题。 const hostName = process.env.HOSTNAME // 队列的属性设置 // 一般来说,最好设置队列自动删除autoDelete,当链接断开时,队列也会删除,这样不会产生非常多的无用队列 // durable是用来的持久化的,最好也可以设置成不持久化 const queueAttr = {autoDelete: true, durable: false} // 定义channel的引用,当链接建立时,所有方法都可以通过引用CH来获取channel方法 let CH = null // 向队列发送消息的函数 function publishMessage (msg) { if (!CH) { return '' } msg = JSON.stringify(msg) // 指定交换机ex, routing key, 以及消息的内容 CH.publish(ex, eventBusTopic, Buffer.from(msg)) } // 当链接rabbitMQ断开时,要主动去重连 function reconnectRabbitMq () { log.info('reconnect_rabbit_mq') connectRabbitMq() } // 链接rabbitMQ的主要函数 function connectRabbitMq () { amqp.connect(amqpAddr, { // 设置connection_name的属性,可以在rabbitMQ的控制台的UI上,看到链接是来自哪个实例 clientProperties: { connection_name: hostName } }) .then((conn) => { log.info('rabbitmq_connect_successd') // 一定要加上链接的报错事件处理,否则一旦报error错,如果不处理这个错误,程序就会崩溃 // error是个特别的事件,务必要处理的 // 报错就直接去重连 conn.on('error', (err) => { log.error('connect_error ' + err.message, err) reconnectRabbitMq() }) // 创建channel return conn.createChannel() }) .then((ch) => { CH = ch // 初始化交换机 ch.assertExchange(ex, 'topic', {durable: true}) // 初始化一个队列,队列名就用hostName, 比较容易从对列名上知道是哪个实例创建的队列 return ch.assertQueue(hostName, queueAttr) }) .then((q) => { // 可以在队列初始化完毕就立即绑定routing key, 也可以暂时不绑定,后续动态的绑定 // CH.bindQueue(q.queue, ex, 'some.topic.aaa') // 消费者,获取消息 CH.consume(q.queue, (msg) => { var _msg = msg.content.toString() var MSG = JSON.parse(_msg) log.info(_msg, MSG) }, {noAck: true}) }) .catch((err) => { console.log(err) }) } // 动态给队列绑定或者解绑routing key function toggleBindQueue (routingKey, bind) { return new Promise((resolve, reject) => { if (!CH) { log.error('channel not established') reject(new Error('channel not established')) return '' } // 初始化队列,如果队列已经存在,就会直接使用 CH.assertQueue(`${hostName}`, queueAttr) .then((q) => { // 如果bind是true,就绑定。否则就解绑 if (bind) { log.info(`bindQueue ${hostName} ${topic}`) return CH.bindQueue(q.queue, ex, topic) } else { return CH.unbindQueue(q.queue, ex, topic) } }) .then((res) => { resolve() }) .catch((err) => { reject(err) log.error(err) }) }) } module.exports = { connectRabbitMq, toggleBindQueue, publishMessage }
相关推荐
liqinglin0 2020-06-01
jvm 2020-05-30
tztzyzyz 2018-09-07
madanling 2018-12-11
mfkpum 2018-11-14
suis 2019-07-25
Danielmumu 2016-06-22
人心 2019-06-27
hackcat 2015-04-23
超级赛亚人 2014-11-15
无情 2014-11-14
asdjkl 2014-09-09
JeWangZhe 2013-07-30
无情 2013-07-02
JAVALin 2013-05-29
人心 2019-06-25
83457317 2011-11-03