rabbitmq 延时队列
前言
某个产品 或者订单,有个有效期 过了有效期要取消
方法一 : 写个脚本,用crontab 定时扫描 改变状态 但是最低只能一分钟 ,不适合
方法二 : 用swoole得毫秒定时器,每秒钟去扫描表 明显占用资源 mysql受不了
方法三 :用rabbitmq延时队列 一开始将其丢入mq 死信队列,设置有效期,过时转发到其他队列,再启动一个消费者 消费 更改表状态
php 安装mq扩展
https://www.cnblogs.com/brady-wang/p/7662393.html
搭建mq服务
https://www.cnblogs.com/brady-wang/p/7660174.html
创建生产者和消费者
生产者 publish.php
<?php header(‘Content-Type:text/html;charset=utf8;‘); $time = 30; $params = array( ‘exchangeName‘ => ‘test_cache_exchange‘."_".$time, ‘queueName‘ => ‘test_cache_queue‘."_".$time, ‘routeKey‘ => ‘test_cache_route‘."_".$time, ); $connectConfig = array( ‘host‘ => ‘127.0.0.1‘, ‘port‘ => 5672, ‘login‘ => ‘admin‘, ‘password‘ => ‘password‘, ‘vhost‘ => ‘/‘ ); //var_dump(extension_loaded(‘amqp‘)); // //exit(); try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die(‘Conexiune esuata‘); //TODO 记录日志 echo ‘rabbit-mq 连接错误:‘, json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die(‘Connection through channel failed‘); //TODO 记录日志 echo ‘rabbit-mq Connection through channel failed:‘, json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); $exchange->setFlags(AMQP_DURABLE);//持久化 $exchange->setName($params[‘exchangeName‘]?:‘‘); $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $exchange->declareExchange(); //$channel->startTransaction(); $queue = new AMQPQueue($channel); $queue->setName($params[‘queueName‘]?:‘‘); $queue->setFlags(AMQP_DURABLE); // 和普通生产者区别 在这 下面是过期时间和转发到的路由 $queue->setArguments(array( ‘x-dead-letter-exchange‘ => ‘delay_exchange‘, ‘x-dead-letter-routing-key‘ => ‘delay_route‘, ‘x-message-ttl‘ => $time*1000, )); $queue->declareQueue(); //绑定 $queue->bind($params[‘exchangeName‘], $params[‘routeKey‘]); } catch(Exception $e) { } //$num = mt_rand(100, 500); $num = 1; //生成消息 $exchange->publish(date("Y-m-d H:i:s"), $params[‘routeKey‘], AMQP_MANDATORY, array(‘delivery_mode‘=>2));
消费者 consumer.php
<?php header(‘Content-Type:text/html;charset=utf8;‘); $params = array( ‘exchangeName‘ => ‘delay_exchange‘, ‘queueName‘ => ‘delay_queue‘, ‘routeKey‘ => ‘delay_route‘, ); $connectConfig = array( ‘host‘ => ‘localhost‘, ‘port‘ => 5672, ‘login‘ => ‘admin‘, ‘password‘ => ‘password‘, ‘vhost‘ => ‘/‘ ); //var_dump(extension_loaded(‘amqp‘)); //exit(); try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die(‘Conexiuneesuata‘); //TODO记录日志 echo ‘rabbit-mq连接错误:‘, json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { //die(‘Connectionthroughchannelfailed‘); //TODO记录日志 echo ‘rabbit-mqConnectionthroughchannelfailed:‘, json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); $exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端 $exchange->setName($params[‘exchangeName‘] ?: ‘‘); $exchange->setType(AMQP_EX_TYPE_DIRECT);//direct类型 $exchange->declareExchange(); //$channel->startTransaction(); $queue = new AMQPQueue($channel); $queue->setName($params[‘queueName‘] ?: ‘‘); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); //绑定 $queue->bind($params[‘exchangeName‘], $params[‘routeKey‘]); } catch (Exception$e) { echo $e->getMessage(); exit(); } function callback(AMQPEnvelope $message){ global $queue; if ($message) { $body = $message->getBody(); echo $body . PHP_EOL; $queue->ack($message->getDeliveryTag()); } else { echo ‘nomessage‘ . PHP_EOL; } } //$queue->consume(‘callback‘);第一种消费方式,但是会阻塞,程序一直会卡在此处 //第二种消费方式,非阻塞 $start = time(); while (true) { $message = $queue->get(); if (!empty($message)) { echo $message->getBody()."--失效时间 ".date("Y-m-d H:i:s"). PHP_EOL; $queue->ack($message->getDeliveryTag());//应答,代表该消息已经消费 // $end = time(); // echo ‘<br>‘ . ($end - $start); } else { //echo‘messagenotfound‘.PHP_EOL; } }
执行推送 我改了不同时间推送,会生成不同的交换机 路由 队列,因为我用得是direct类型 要一一匹配
消费者开启
[ mq]# php consumer.php 2020-07-18 11:07:22--失效时间 2020-07-18 11:07:42 2020-07-18 11:07:22--失效时间 2020-07-18 11:07:42 2020-07-18 11:07:23--失效时间 2020-07-18 11:07:43 2020-07-18 11:07:23--失效时间 2020-07-18 11:07:43 2020-07-18 11:13:04--失效时间 2020-07-18 11:13:24 2020-07-18 11:21:00--失效时间 2020-07-18 11:21:10 2020-07-18 11:21:32--失效时间 2020-07-18 11:22:02 2020-07-18 11:21:32--失效时间 2020-07-18 11:22:02 2020-07-18 11:21:22--失效时间 2020-07-18 11:22:12 2020-07-18 11:21:23--失效时间 2020-07-18 11:22:13 2020-07-18 11:21:23--失效时间 2020-07-18 11:22:13
发现正常,都是我设置的事件过期后就到处理队列,在这里消费,处理逻辑即可
参考 https://www.cnblogs.com/Zhangcsc/p/11739754.html
https://blog.csdn.net/weixin_34310369/article/details/92262465?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-2
相关推荐
程序员伊成 2020-08-06
waitzkj 2020-07-25
powrexly 2020-07-20
shenzhenzsw 2020-06-21
Soongp 2020-06-07
waitzkj 2020-06-04
cj0 2020-06-01
zhuxue 2020-10-14
shenzhenzsw 2020-10-09
shyoldboy 2020-09-27
leihui00 2020-09-16
lishijian 2020-08-17
ljcsdn 2020-07-27
liym 2020-07-20
zhoucheng0 2020-07-19
shenzhenzsw 2020-07-18
woaishanguosha 2020-07-18
zhoucheng0 2020-07-08