Mix PHP V2 实例:协程池异步邮件发送守护程序
邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。
下面演示一个异步邮件发送系统的开发过程,涉及知识点:
- 异步
- 消息队列
- 守护进程
- 协程池
如何使用消息队列实现异步
PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:
- redis
- rabbitmq
- kafka
本次我们选用 Redis 来实现异步邮件发送,Redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:
// 入列 $redis->lpush($key, $data); // 出列 $data = $redis->rpop($key); // 阻塞出列 $data = $redis->brpop($key, 10);
架构设计
本实例由传统 MVC 框架投递邮件发送需求(生产者),Mix PHP 编写的守护程序执行发送任务(消费者)。
邮件发送库选型
以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。
由于发送任务是由 Mix PHP 执行,所以 swiftmailer 是安装在 Mix PHP 项目中,在项目根目录中执行以下命令安装:
composer require swiftmailer/swiftmailer
生产者开发
在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 Mix PHP 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。
在传统 MVC 框架的控制器中增加如下代码:
通常框架中使用 Redis 会安装一个类库来使用,本例使用原生代码,便于理解。// 连接 $redis = new \Redis(); if (!$redis->connect('127.0.0.1', 6379)) { throw new \Exception('Redis connect failed.'); } $redis->auth(''); $redis->select(0); // 投递任务 $data = [ 'to' => '***@qq.com', 'body' => 'The message content', 'subject' => 'The title content', ]; $redis->lpush('queue:email', serialize($data));
通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有在生产者中执行,而是待消息被消费者获取后才执行。
消费者开发
使用本例时,请确保你使用的 Swoole 编译时开启了 openssl本例我们采用 Mix PHP V2 的守护程序、协程池来完成一个超高性能的邮件发送程序。
因为我们是开发一个守护程序,所以我们在 applications/daemon
模块中开发,首先我们在配置 applications/daemon/config/main.php
中注册一个命令:
// 命令 'commands' => [ 'mailer' => ['Mailer', 'description' => 'Mailer daemon.'], ],
注册的命令中指定的 Mailer 命令类,接下来我们编写一个 MailerCommand 类:
applications/daemon/src/Commands/MailerCommand.php
<?php namespace Daemon\Commands; use Daemon\Libraries\MailerWorker; use Mix\Concurrent\CoroutinePool\Dispatcher; use Mix\Core\Coroutine\Channel; use Mix\Helper\ProcessHelper; /** * Class MailerCommand * @package Daemon\Commands * @author liu,jian <[email protected]> */ class MailerCommand { /** * 退出 * @var bool */ public $quit = false; /** * 主函数 */ public function main() { // 捕获信号 ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], function ($signal) { $this->quit = true; ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], null); }); // 协程池执行任务 xgo(function () { $maxWorkers = 20; $maxQueue = 20; $jobQueue = new Channel($maxQueue); $dispatch = new Dispatcher([ 'jobQueue' => $jobQueue, 'maxWorkers' => $maxWorkers, ]); $dispatch->start(MailerWorker::class); // 投放任务 $redis = app()->redisPool->getConnection(); while (true) { if ($this->quit) { $dispatch->stop(); return; } try { $data = $redis->brPop(['queue:email'], 3); } catch (\Throwable $e) { $dispatch->stop(); return; } if (!$data) { continue; } $data = array_pop($data); // brPop命令最后一个键才是值 $jobQueue->push($data); } }); } }从
$data = $redis->brPop(['queue:email'], 3);
外部的异常捕获可得知,当 Redis 连接出错时,比如 Redis 重启、连接异常时协程池会安全退出,也就是说当进程异常退出后用户需使用 supervisor
、pm2
等工具重启守护进程。上面是一个 Mix PHP 协程池的使用代码,基本可以直接复制使用,框架默认包含了协程池的 Demo,本次实例只是修改了协程池的 Worker,本命令主要是完成从 Redis 队列中获取消息然后 push 到 jobQueue 中,jobQueue 中的数据会被 20 个 Worker 实例中某一个抢占后并行执行,本例的邮件发送代码逻辑就在 MailerWorker 类中:
applications/daemon/src/Libraries/MailerWorker.php
<?php namespace Daemon\Libraries; use Mix\Concurrent\CoroutinePool\AbstractWorker; use Mix\Concurrent\CoroutinePool\WorkerInterface; /** * Class MailerWorker * @package Daemon\Libraries * @author liu,jian <[email protected]> */ class MailerWorker extends AbstractWorker implements WorkerInterface { /** * 邮件发送器 * @var Mailer */ public $mailer; /** * 初始化事件 */ public function onInitialize() { parent::onInitialize(); // TODO: Change the autogenerated stub // 实例化一些需重用的对象 $this->mailer = new Mailer(); } /** * 处理 * @param $data */ public function handle($data) { // TODO: Implement handle() method. $data = unserialize($data); if (empty($data)) { return; } try { $this->mailer->send($data['to'], $data['subject'], $data['body']); app()->log->info("Mail sent successfully:to {to} subject {subject}", $data); } catch (\Throwable $e) { app()->log->error("Mail failed to send:to {to} subject {subject} error {error}", array_merge($data, ['error' => $e->getMessage()])); } } }
由以上代码可见,Worker 在初始化时,新增了一个 Mailer 类的属性,当 jobQueue 消息投递过来时消息会传递到 handle 方法,在该方法中使用 Mailer 类的实例完成邮件发送任务,所以我们要编写了一个 Mailer 发送程序:
applications/daemon/src/Libraries/Mailer.php
<?php namespace Daemon\Libraries; use Mix\Core\Coroutine; /** * Class Mailer * @package Daemon\Libraries * @author liu,jian <[email protected]> */ class Mailer { /** * 配置信息 */ const HOST = 'smtpdm.aliyun.com'; const PORT = 465; const SECURITY = 'ssl'; const USERNAME = '***'; const PASSWORD = '***'; /** * Mailer constructor. */ public function __construct() { // 开启协程钩子 Coroutine::enableHook(); } /** * 发送 * @param $to * @param $subject * @param $body * @return int */ public function send($to, $subject, $body) { // Create the Transport $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY)) ->setUsername(self::USERNAME) ->setPassword(self::PASSWORD); // Create the Mailer using your created Transport $mailer = new \Swift_Mailer($transport); // Create a message $message = (new \Swift_Message($subject)) ->setFrom([self::USERNAME => '**网']) ->setTo($to) ->setBody($body); // Send the message return $mailer->send($message); } }
在 Mailer 发送程序中我们使用了前面 composer 安装的 swiftmailer 库来发送邮件,以上就完成了全部的代码逻辑,现在我们开始测试。
先启动消费者守护程序:
[root@localhost bin]# ./mix-daemon mailer
将上文的生产者脚本命名为 push.php
然后在 CLI 中执行 (开一个新终端):
[root@localhost bin]# php /tmp/push.php
消费者守护程序结果:
[root@localhost bin]# ./mix-daemon mailer [info] 2019-04-15 11:48:36 [message] Mail sent successfully:to ***@qq.com subject The title content
命令行终端打印了发送成功的日志,发送完成。