laravel 队列
在实际的项目开发中,我们经常会遇到需要轻量级队列的情形,例如发短信、发邮件等,这些任务不足以使用 kafka、RabbitMQ 等重量级的消息队列,但是又的确需要异步、重试、并发控制等功能。通常来说,我们经常会使用 Redis、Beanstalk、Amazon SQS 来实现相关功能,laravel 为此对不同的后台队列服务提供统一的 API,本文将会介绍应用最为广泛的 redis 队列。
在讲解 laravel 的队列服务之前,我们要先说说基于 redis 的队列服务。首先,redis设计用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列
redis 队列的数据结构
List 链表
redis 做消息队列的特性例如FIFO(先入先出)很容易实现,只需要一个 list 对象从头取数据,从尾部塞数据即可。
相关的命令:(1)左侧入右侧出:lpush/rpop;(2)右侧入左侧出:rpush/lpop。
这个简单的消息队列很容易实现。
Zset 有序集合
有些任务场景,并不需要任务立刻执行,而是需要延迟执行;有些任务很重要,需要在任务失败的时候重新尝试。这些功能仅仅依靠 list 是无法完成的。这个时候,就需要 redis 的有序集合。
Redis 有序集合和 Redis 集合类似,是不包含相同字符串的合集。它们的差别是,每个有序集合的成员都关联着一个评分 score,这个评分用于把有序集合中的成员按最低分到最高分排列。
单看有序集合和延迟任务并无关系,但是可以将有序集合的评分 score 设置为延时任务开启的时间,之后轮询这个有序集合,将到期的任务拿出来进行处理,这样就实现了延迟任务的功能。
对于重要的需要重试的任务,在任务执行之前,会将该任务放入有序集合中,设置任务最长的执行时间。若任务顺利执行完毕,该任务会在有序集合中删除。如果任务没有在规定时间内完成,那么该有序集合的任务将会被重新放入队列中。
相关命令:
(1) ZADD 添加一个或多个成员到有序集合,或者如果它已经存在更新其分数。
(2) ZRANGEBYSCORE 按分数返回一个成员范围的有序集合。
(3) ZREMRANGEBYRANK 在给定的索引之内删除所有成员的有序集合。
laravel 队列服务的任务调度
队列服务的任务调度过程如下:
laravel 的队列服务由两个进程控制,一个是生产者,一个是消费者。这两个进程操纵了 redis 三个队列,其中一个 List,负责即时任务,两个 Zset,负责延时任务与待处理任务。
生产者负责向 redis 推送任务,如果是即时任务,默认就会向 queue:default 推送;如果是延时任务,就会向 queue:default:delayed 推送。
消费者轮询两个队列,不断的从队列中取出任务,先把任务放入 queue:default:reserved 中,再执行相关任务。如果任务执行成功,就会删除 queue:default:reserved 中的任务,否则会被重新放入 queue:default:delayed 队列中。
laravel 队列服务的总体流程
任务分发流程:
任务处理器运作:
创建任务
queue 设置
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'retry_after' => 90, ],
在config/queue.php中进行配置
一般来说,默认的 redis 配置如上,connection 是 database 中 redis 的连接名称;queue 是 redis 中的队列名称,值得注意的是,如果使用的是 redis 集群的话,这个需要使用 key hash tag,也就是 {default};当任务运行超过 retry_after 这个时间后,该任务会被重新放入队列当中。
任务类的创建
任务类的结构很简单,一般来说只会包含一个让队列用来调用此任务的 handle 方法。
如果想要使得任务被推送到队列中,而不是同步执行,那么需要实现 IlluminateContractsQueueShouldQueue 接口。
如果想要让任务推送到特定的连接中,例如 redis 或者 sqs,那么需要设置 conneciton 变量。
如果想要让任务推送到特定的队列中去,可以设置 queue 变量。
如果想要让任务延迟推送,那么需要设置 delay 变量。
如果想要设置任务至多重试的次数,可以使用 tries 变量;
如果想要设置任务可以运行的最大秒数,那么可以使用 timeout 参数。
如果想要手动访问队列,可以使用 trait : IlluminateQueueInteractsWithQueue。
任务的分发
分发服务
写好任务类后,就能通过 dispatch 辅助函数来分发它了。唯一需要传递给 dispatch 的参数是这个任务类的实例:
class PodcastController extends Controller { public function store(Request $request) { // 创建播客... ProcessPodcast::dispatch($podcast); } }
如果想延迟执行一个队列中的任务,可以用任务实例的 delay 方法。
ProcessPodcast::dispatch($podcast) ->delay(Carbon::now()->addMinutes(10));
通过推送任务到不同的队列,可以给队列任务分类,甚至可以控制给不同的队列分配多少任务。要指定队列的话,就调用任务实例的 onQueue 方法:
ProcessPodcast::dispatch($podcast)->onQueue('processing');
如果使用了多个队列连接,可以将任务推到指定连接。要指定连接的话,可以在分发任务的时候使用 onConnection 方法:
ProcessPodcast::dispatch($podcast)->onConnection('redis ');
KingFer