RabbitMQ:笔记
持久化
- 交换器持久化
- 队列持久化
- 消息持久化
(交换器)
(队列)
(消息)
生产者可靠性
事务机制
普通发送
$ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params);
事务模式:单个发送
$ch->startTransaction(); try { $ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params); $ch->commitTransaction(); } catch(AMQPConnectionException $e) { $ch->rollbackTransaction(); }
事务模式:批量发送
$loop_times = 10; $ch->startTransaction(); for($i=0;$i<$loop_times;$i++) { try { $ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params); $ch->commitTransaction(); } catch(AMQPConnectionException $e) { $ch->rollbackTransaction(); } }
发送者确认
单个消息
$ch->confirmSelect(); $ex->publish($message, $routing_key,AMQP_MANDATORY, $params); $ack_callback = function ($delivery_tag, $multiple) { // ack处理 echo 'Message acked', PHP_EOL; var_dump(func_get_args()); return true; }; $nack_callback = function ($delivery_tag, $multiple, $requeue) use ($message) { // nack处理: 重新发送消息,或记录日志 echo 'Message nacked', PHP_EOL; var_dump(func_get_args()); return false; }; $ch->setConfirmCallback($ack_callback, $nack_callback); // 设置回调 $ch->waitForConfirm(1); // 在setConfirmCallback()后调用
来自broker的ack确认
批量消息
$ch->confirmSelect(); // 批量发布,一次确认 $messages = []; for($i=0;$i<10;$i++) { $messages[$i] = $i.$message; $ex->publish($messages[$i], $routing_key, AMQP_MANDATORY, $params); } $ack_callback = function ($delivery_tag, $multiple) { // ack处理 echo 'Message acked', PHP_EOL; var_dump(func_get_args()); return true; }; $nack_callback = function ($delivery_tag, $multiple, $requeue) use ($messages) { // nack处理: 重新发送消息该批次消息,或者记录日志 echo 'Message nacked', PHP_EOL; var_dump(func_get_args()); return false; }; $ch->setConfirmCallback($ack_callback, $nack_callback); // 设置回调 $ch->waitForConfirm(1); // 在setConfirmCallback()后调用
broker发送了两个ack确认(完整的网络包中producer发送了10条消息);
如果收到nack信令,需要重新发送整个批次消息。
小问题:为什么broker回复了两个ack?broker回复ack数量和机制是什么?
- 异步确认(pecl_amqp暂时不支持)
消费者可靠性
推模式(basicConsume)
自动ack
$conn->setReadTimeout(3); // 无数据时,超时时间设置 $consumer_tag = basename(__FILE__, '.php').uniqid() . microtime(true); // consume try { $data = $q->consume($callback, AMQP_AUTOACK, $consumer_tag); } catch (Exception $exception) { // 无数据,超时处理 // 显式取消消费 $q->cancel($consumer_tag); }
3s后超时,cancel掉消费
完整例子(consume过程)
$conn->setReadTimeout(3); $consumer_tag = basename(__FILE__, '.php').uniqid() . microtime(true); $num = 3; $callback = function (AMQPEnvelope $envelope, AMQPQueue $queue) use(&$num){ $tag = $envelope->getDeliveryTag(); var_dump($tag); var_dump($num); if($num <= 0){ // MARK:false显式退出callback,会导致丢数据 // 例如队列中有30条数据,条件判断$num<=0退出,会导致业务逻辑只处理了4条数据 // 剩余的数据未处理而丢掉 // return false; } $num --; return true; }; // consume try { $data = $q->consume($callback, AMQP_AUTOACK, $consumer_tag); } catch (Exception $exception) { // 显式取消消费 $q->cancel($consumer_tag); }
(consume数据,到无数据)
另外:
qos()/setPrefetchCount()/setPrefetchSize()对于autoack无效
autoack小结:
回调超时通过timeout实现,且catch异常后,需要cancel消费者;
callback回调函数,不应该return false(造成队列丢数据),如遇异常可以记录日志等;
qos()等限流设置对autoack无效。
manual ack
$conn->setReadTimeout(3); $ch->qos(0,1); // prefetchCount:最多1条unacked消息 $consumer_tag = basename(__FILE__, '.php').uniqid() . microtime(true); $num = 3; $callback = function (AMQPEnvelope $envelope, AMQPQueue $queue) use(&$num){ $tag = $envelope->getDeliveryTag(); $msg = $envelope->getBody(); var_dump(implode(',',[$tag, $num, $msg]).PHP_EOL); // 手动ack $queue->ack($tag); sleep(1); if($num <= 0){ // 显式退出callback return false; } $num --; return true; }; // consume try { $data = $q->consume($callback, AMQP_NOPARAM, $consumer_tag); } catch (Exception $exception) { // 显式取消消费 $q->cancel($consumer_tag); }
callback()回调函数中,只处理4条;
尽管queue推送了5条消息,但是consumer只确认了4条,因此队列里只是减少了4条消息
manual ack小结:
timeout实现回调超时,同autoack;
回调函数内可以按逻辑return false,不会丢失消息(消息短暂处于unacked状态后,恢复至ready状态);
qos()等限流函数,有效,可以避免客户端内存溢出
no ack
消息需要显式ack(),但是没有执行ack()
结论:
消息处于unacked状态,consumer channel断开后,消息恢复至ready状态
拉模式(basicGet)
拉模式:非阻塞消费
auto ack
$conn->setReadTimeout(3); // 拉模式:非阻塞 $ch->qos(0,10); // 自动ack,qos设置无效 $num = 3; for($i=0;$i<$num;$i++) { $envelope = $q->get(AMQP_AUTOACK); var_dump($envelope->getDeliveryTag()."||".$envelope->getBody()); echo "<br>"; }
有数据时,立即返回数据;无数据时,立即返回false
manual ack
$conn->setReadTimeout(3); // 拉模式:非阻塞 $ch->qos(0,10); // 自动ack,qos设置无效 $num = 3; for($i=0;$i<$num;$i++) { $envelope = $q->get(); var_dump($envelope->getDeliveryTag()."||".$envelope->getBody()); echo "<br>"; if($i!=$num-1) { $q->ack($envelope->getDeliveryTag()); } }
拉取了3条消息,只ack了两条
最终,只消费了队列内的两条数据,第3条数据短暂变成unacked状态后,恢复至ready状态
no ack
结论:
未ack的消息,会恢复至ready状态
to be continued
Refer:
RabbitMQ专栏:https://blog.csdn.net/u013256...
php-amqp测试用例:https://github.com/pdezwart/p...