gmq 基于 golang 和 redis 实现的简易队列 项目简介
1. 概述gmq是基于redis提供的特性,使用go语言开发的一个简单易用的队列;关于 redis 使用特性可以参考之前本人写过一篇很简陋的文章 Redis 实现队列;gmq的灵感和设计是基于有赞延迟队列设计,文章内容清晰而且很好理解,但是没有提供源码,在文章的最后也提到了一些未来架构方向; gmq不是简单按照有赞延迟队列的设计实现功能,在它的基础上,做了一些修改和优化,主要如下:功能上 多种任务模式,不单单只是延迟队列;例如:延迟队列,普通队列,优先级队列 架构上: 添加 job 由dispatcher调度分配各个bucket,而不是由timer 每个bucket维护一个timer,而不是所有bucket一个timer timer每次扫描bucket到期job时,会一次性返回多个到期job,而不是每次只返回一个job timer的扫描时钟由bucket中下个job到期时间决定,而不是每秒扫描一次 2. 应用场景延迟任务 延迟任务,例如用户下订单一直处于未支付状态,半个小时候自动关闭订单 异步任务 异步任务,一般用于耗时操作,例如群发邮件等批量操作 超时任务 规定时间内(TTR)没有执行完毕或程序被意外中断,则消息重新回到队列再次被消费,一般用于数据比较敏感,不容丢失的 优先级任务 当多个任务同时产生时,按照任务设定等级优先被消费,例如a,b两种类型的job,优秀消费a,然后再消费b 3. 安装3.1 源码运行配置文件位于gmq/conf.ini,可以根据自己项目需求修改配置cd $GOPATH/src # 进入gopath/src目录
git clone https://github.com/wuzhc/gmq.git
cd gmq
go get -u -v github.com/kardianos/govendor # 如果有就不需要安装了
govendor sync -v # 如果很慢,可能需要翻墙
go run main.go start3.2 执行文件运行cd $GOPATH/src/gmq
# 编译成可执行文件
go build
# 启动
./gmq start
# 停止
./gmq stop
# 守护进程模式启动,不输出日志到console
nohup ./gmq start >/dev/null 2>&1 &
# 守护进程模式下查看日志输出(配置文件conf.ini需要设置target_type=file,filename=gmq.log)
tail -f gmq.log4. 客户端目前只实现python,go,php语言的客户端的demo,参考:https://github.com/wuzhc/demo/tree/master/mq运行# php
# 生产者
php producer.php
# 消费者
php consumer.php
# python
# 生产者
python producer.py
# 消费者
python consumer.py一条消息结构{
"id": "xxxx", # 任务id,这个必须是一个唯一值,将作为redis的缓存键
"topic": "xxx", # topic是一组job的分类名,消费者将订阅topic来消费该分类下的job
"body": "xxx", # 消息内容
"delay": "111", # 延迟时间,单位秒
"TTR": "11111", # 执行超时时间,单位秒
"status": 1, # job执行状态,该字段由gmq生成
"consumeNum":1, # 被消费的次数,主要记录TTR>0时,被重复消费的次数,该字段由gmq生成
}延迟任务$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '1800', // 单位秒,半个小时后执行
'TTR' => '0'
];超时任务$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '100' // 100秒后还未得到消费者ack确认,则再次添加到队列,将再次被被消费
];异步任务$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '0'
];优先级任务$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_A","topic_B","topic_C"], //优先消费topic_A,消费完后消费topic_B,最后再消费topic_C
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '0'
];5. gmq 流程图如下:5.1 延迟时间 delay当 job.delay>0时,job 会被分配到 bucket 中,bucket 会有周期性扫描到期 job,如果到期,job 会被 bucket 移到ready queue,等待被消费 当 job.delay=0 时,job 会直接加到ready queue,等待被消费5.2 执行超时时间 TTR参考第一个图的流程,当 job 被消费者读取后,如果job.TTR>0,即 job 设置了执行超时时间,那么 job 会在读取后会被添加到 TTRBucket(专门存放设置了超时时间的 job),并且设置job.delay = job.TTR,如果在 TTR 时间内没有得到消费者 ack 确认然后删除 job,job 将在 TTR 时间之后添加到ready queue,然后再次被消费(如果消费者在 TTR 时间之后才请求 ack,会得到失败的响应)5.3 确认机制主要和TTR的设置有关系,确认机制可以分为两种:当 job.TTR=0 时,消费者pop出 job 时,即会自动删除job pool中的 job 元数据 当 job.TTR>0 时,即 job 执行超时时间,这个时间是指用户pop出 job 时开始到用户ack确认删除结束这段时间,如果在这段时间没有ACK,job 会被再次加入到ready queue,然后再次被消费,只有用户调用了ACK,才会去删除job pool中 job 元数据6. web 监控gmq提供了一个简单 web 监控平台(后期会提供根据 job.Id 追踪消息的功能),方便查看当前堆积任务数,默认监听端口为8000,例如:http://127.0.0.1:8000,界面如下: 后台模板来源于https://github.com/george518/PPGo_Job7. 遇到问题以下是开发遇到的问题,以及一些粗糙的解决方案7.1 安全退出如果强行中止gmq的运行,可能会导致一些数据丢失,例如下面一个例子:如果发生上面的情况,就会出现 job 不在bucket中,也不在ready queue,这就出现了 job 丢失的情况,而且将没有任何机会去删除job pool中已丢失的 job,长久之后job pool可能会堆积很多的已丢失 job 的元数据;所以安全退出需要在接收到退出信号时,应该等待所有goroutine处理完手中的事情,然后再退出7.1.1 gmq退出流程首先gmq通过 context 传递关闭信号给dispatcher,dispatcher接收到信号会关闭dispatcher.closed,每个bucket会收到close信号,然后先退出timer检索,再退出bucket,dispatcher等待所有 bucket 退出后,然后退出dispatcher退出顺序流程: timer -> bucket -> dispatcher7.1.2 注意不要使用kill -9 pid来强制杀死进程,因为系统无法捕获 SIGKILL 信号,导致 gmq 可能执行到一半就被强制中止,应该使用kill -15 pid,kill -1 pid或kill -2 pid,各个数字对应信号如下:9 对应 SIGKILL 15 对应 SIGTERM 1 对应 SIGHUP 2 对应 SIGINT 信号参考https://www.jianshu.com/p/5729fc095b2a7.2 智能定时器每一个bucket都会维护一个timer,不同于有赞设计,timer不是每秒轮询一次,而是根据bucket下一个 job 到期时间来设置timer的定时时间 ,这样的目的在于如果bucket没有 job 或 job 到期时间要很久才会发生,就可以减少不必要的轮询; timer只有处理完一次业务后才会重置定时器,这样的目的在于可能出现上一个时间周期还没执行完毕,下一个定时事件又发生了 如果到期的时间很相近,timer就会频繁重置定时器时间,就目前使用来说,还没出现什么性能上的问题7.3 原子性问题我们知道 redis 的命令是排队执行,在一个复杂的业务中可能会多次执行 redis 命令,如果在大并发的场景下,这个业务有可能中间插入了其他业务的命令,导致出现各种各样的问题;redis 保证整个事务原子性和一致性问题一般用multi/exec或lua脚本<span style="background-color:#ffffff;">,</span>gmq在操作涉及复杂业务时使用的是lua脚本,因为lua 脚本除了有multi/exec的功能外,还有Pipepining功能(主要打包命令,减少和redis server通信次数),下面是一个gmq定时器扫描 bucket 集合到期 job 的 lua 脚本:-- 获取到期的50个job
local jobIds = redis.call('zrangebyscore',KEYS[1], 0, ARGV[4], 'withscores', 'limit', 0, 50)
local res = {}
for k,jobId in ipairs(jobIds) do
if k%2~=0 then
local jobKey = string.format('%s:%s', ARGV[3], jobId)
local status = redis.call('hget', jobKey, 'status')
-- 检验job状态
if tonumber(status) == tonumber(ARGV[1]) or tonumber(status) == tonumber(ARGV[2]) then
-- 先移除集合中到期的job,然后到期的job返回给timer
local isDel = redis.call('zrem', KEYS[1], jobId)
if isDel == 1 then
table.insert(res, jobId)
end
end
end
end
local nextTime
-- 计算下一个job执行时间,用于设置timer下一个时钟周期
local nextJob = redis.call('zrange', KEYS[1], 0, 0, 'withscores')
if next(nextJob) == nil then
nextTime = -1
else
nextTime = tonumber(nextJob[2]) - tonumber(ARGV[4])
if nextTime < 0 then
nextTime = 1
end
end
table.insert(res,1,nextTime)
return res7.4 redis 连接池可能一般 phper 写业务很少会接触到连接池,其实这是由 php 本身所决定他应用不大,当然在 php 的扩展swoole还是很有用处的gmq的 redis 连接池是使用gomodule/redigo/redis自带连接池,它带来的好处是限制 redis 连接数,通过复用 redis 连接来减少开销,另外可以防止 tcp 被消耗完,这在生产者大量生成数据时会很有用// gmq/mq/redis.go
Redis = &RedisDB{
Pool: &redis.Pool{
MaxIdle: 30, // 最大空闲链接
MaxActive: 10000, // 最大链接
IdleTimeout: 240 * time.Second, // 空闲链接超时
Wait: true, // 当连接池耗尽时,是否阻塞等待
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword(""))
if err != nil {
return nil, err
}
return c, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
},
}8. 注意问题job.id 在job pool是唯一的,它将作为 redis 的缓存键;gmq不自动为 job 生成唯一 id 值是为了用户可以根据自己生成的 job.id 来追踪 job 情况,如果 job.id 是重复的,push 时会报重复 id 的错误 bucket 数量不是越多越好,一般来说,添加到ready queue的速度取决与 redis 性能,而不是 bucket 数量9. 使用中可能出现的问题9.1 客户端出现大量的 TIME_WAIT 状态,并且新的连接被拒绝netstat -anp | grep 9503 | wc -l
tcp 0 0 10.8.8.188:41482 10.8.8.185:9503 TIME_WAIT -这个是正常现象,由 tcp 四次挥手可以知道,当接收到 LAST_ACK 发出的 FIN 后会处于TIME_WAIT状态,主动关闭方(客户端)为了确保被动关闭方(服务端)收到 ACK,会等待 2MSL 时间,这个时间是为了再次发送 ACK,例如被动关闭方可能因为接收不到 ACK 而重传 FIN;另外也是为了旧数据过期,不影响到下一个链接;如果要避免大量TIME_WAIT的连接导致 tcp 被耗尽,一般方法如下:使用长连接 配置文件,网上很多教程,就是让系统尽快的回收TIME_WAIT状态的连接 使用连接池,当连接池耗尽时,阻塞等待,直到回收再利用10. 相关链接有赞延迟队列设计 Redis 实现队列
git clone https://github.com/wuzhc/gmq.git
cd gmq
go get -u -v github.com/kardianos/govendor # 如果有就不需要安装了
govendor sync -v # 如果很慢,可能需要翻墙
go run main.go start3.2 执行文件运行cd $GOPATH/src/gmq
# 编译成可执行文件
go build
# 启动
./gmq start
# 停止
./gmq stop
# 守护进程模式启动,不输出日志到console
nohup ./gmq start >/dev/null 2>&1 &
# 守护进程模式下查看日志输出(配置文件conf.ini需要设置target_type=file,filename=gmq.log)
tail -f gmq.log4. 客户端目前只实现python,go,php语言的客户端的demo,参考:https://github.com/wuzhc/demo/tree/master/mq运行# php
# 生产者
php producer.php
# 消费者
php consumer.php
# python
# 生产者
python producer.py
# 消费者
python consumer.py一条消息结构{
"id": "xxxx", # 任务id,这个必须是一个唯一值,将作为redis的缓存键
"topic": "xxx", # topic是一组job的分类名,消费者将订阅topic来消费该分类下的job
"body": "xxx", # 消息内容
"delay": "111", # 延迟时间,单位秒
"TTR": "11111", # 执行超时时间,单位秒
"status": 1, # job执行状态,该字段由gmq生成
"consumeNum":1, # 被消费的次数,主要记录TTR>0时,被重复消费的次数,该字段由gmq生成
}延迟任务$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '1800', // 单位秒,半个小时后执行
'TTR' => '0'
];超时任务$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '100' // 100秒后还未得到消费者ack确认,则再次添加到队列,将再次被被消费
];异步任务$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '0'
];优先级任务$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_A","topic_B","topic_C"], //优先消费topic_A,消费完后消费topic_B,最后再消费topic_C
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '0'
];5. gmq 流程图如下:5.1 延迟时间 delay当 job.delay>0时,job 会被分配到 bucket 中,bucket 会有周期性扫描到期 job,如果到期,job 会被 bucket 移到ready queue,等待被消费 当 job.delay=0 时,job 会直接加到ready queue,等待被消费5.2 执行超时时间 TTR参考第一个图的流程,当 job 被消费者读取后,如果job.TTR>0,即 job 设置了执行超时时间,那么 job 会在读取后会被添加到 TTRBucket(专门存放设置了超时时间的 job),并且设置job.delay = job.TTR,如果在 TTR 时间内没有得到消费者 ack 确认然后删除 job,job 将在 TTR 时间之后添加到ready queue,然后再次被消费(如果消费者在 TTR 时间之后才请求 ack,会得到失败的响应)5.3 确认机制主要和TTR的设置有关系,确认机制可以分为两种:当 job.TTR=0 时,消费者pop出 job 时,即会自动删除job pool中的 job 元数据 当 job.TTR>0 时,即 job 执行超时时间,这个时间是指用户pop出 job 时开始到用户ack确认删除结束这段时间,如果在这段时间没有ACK,job 会被再次加入到ready queue,然后再次被消费,只有用户调用了ACK,才会去删除job pool中 job 元数据6. web 监控gmq提供了一个简单 web 监控平台(后期会提供根据 job.Id 追踪消息的功能),方便查看当前堆积任务数,默认监听端口为8000,例如:http://127.0.0.1:8000,界面如下: 后台模板来源于https://github.com/george518/PPGo_Job7. 遇到问题以下是开发遇到的问题,以及一些粗糙的解决方案7.1 安全退出如果强行中止gmq的运行,可能会导致一些数据丢失,例如下面一个例子:如果发生上面的情况,就会出现 job 不在bucket中,也不在ready queue,这就出现了 job 丢失的情况,而且将没有任何机会去删除job pool中已丢失的 job,长久之后job pool可能会堆积很多的已丢失 job 的元数据;所以安全退出需要在接收到退出信号时,应该等待所有goroutine处理完手中的事情,然后再退出7.1.1 gmq退出流程首先gmq通过 context 传递关闭信号给dispatcher,dispatcher接收到信号会关闭dispatcher.closed,每个bucket会收到close信号,然后先退出timer检索,再退出bucket,dispatcher等待所有 bucket 退出后,然后退出dispatcher退出顺序流程: timer -> bucket -> dispatcher7.1.2 注意不要使用kill -9 pid来强制杀死进程,因为系统无法捕获 SIGKILL 信号,导致 gmq 可能执行到一半就被强制中止,应该使用kill -15 pid,kill -1 pid或kill -2 pid,各个数字对应信号如下:9 对应 SIGKILL 15 对应 SIGTERM 1 对应 SIGHUP 2 对应 SIGINT 信号参考https://www.jianshu.com/p/5729fc095b2a7.2 智能定时器每一个bucket都会维护一个timer,不同于有赞设计,timer不是每秒轮询一次,而是根据bucket下一个 job 到期时间来设置timer的定时时间 ,这样的目的在于如果bucket没有 job 或 job 到期时间要很久才会发生,就可以减少不必要的轮询; timer只有处理完一次业务后才会重置定时器,这样的目的在于可能出现上一个时间周期还没执行完毕,下一个定时事件又发生了 如果到期的时间很相近,timer就会频繁重置定时器时间,就目前使用来说,还没出现什么性能上的问题7.3 原子性问题我们知道 redis 的命令是排队执行,在一个复杂的业务中可能会多次执行 redis 命令,如果在大并发的场景下,这个业务有可能中间插入了其他业务的命令,导致出现各种各样的问题;redis 保证整个事务原子性和一致性问题一般用multi/exec或lua脚本<span style="background-color:#ffffff;">,</span>gmq在操作涉及复杂业务时使用的是lua脚本,因为lua 脚本除了有multi/exec的功能外,还有Pipepining功能(主要打包命令,减少和redis server通信次数),下面是一个gmq定时器扫描 bucket 集合到期 job 的 lua 脚本:-- 获取到期的50个job
local jobIds = redis.call('zrangebyscore',KEYS[1], 0, ARGV[4], 'withscores', 'limit', 0, 50)
local res = {}
for k,jobId in ipairs(jobIds) do
if k%2~=0 then
local jobKey = string.format('%s:%s', ARGV[3], jobId)
local status = redis.call('hget', jobKey, 'status')
-- 检验job状态
if tonumber(status) == tonumber(ARGV[1]) or tonumber(status) == tonumber(ARGV[2]) then
-- 先移除集合中到期的job,然后到期的job返回给timer
local isDel = redis.call('zrem', KEYS[1], jobId)
if isDel == 1 then
table.insert(res, jobId)
end
end
end
end
local nextTime
-- 计算下一个job执行时间,用于设置timer下一个时钟周期
local nextJob = redis.call('zrange', KEYS[1], 0, 0, 'withscores')
if next(nextJob) == nil then
nextTime = -1
else
nextTime = tonumber(nextJob[2]) - tonumber(ARGV[4])
if nextTime < 0 then
nextTime = 1
end
end
table.insert(res,1,nextTime)
return res7.4 redis 连接池可能一般 phper 写业务很少会接触到连接池,其实这是由 php 本身所决定他应用不大,当然在 php 的扩展swoole还是很有用处的gmq的 redis 连接池是使用gomodule/redigo/redis自带连接池,它带来的好处是限制 redis 连接数,通过复用 redis 连接来减少开销,另外可以防止 tcp 被消耗完,这在生产者大量生成数据时会很有用// gmq/mq/redis.go
Redis = &RedisDB{
Pool: &redis.Pool{
MaxIdle: 30, // 最大空闲链接
MaxActive: 10000, // 最大链接
IdleTimeout: 240 * time.Second, // 空闲链接超时
Wait: true, // 当连接池耗尽时,是否阻塞等待
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword(""))
if err != nil {
return nil, err
}
return c, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
},
}8. 注意问题job.id 在job pool是唯一的,它将作为 redis 的缓存键;gmq不自动为 job 生成唯一 id 值是为了用户可以根据自己生成的 job.id 来追踪 job 情况,如果 job.id 是重复的,push 时会报重复 id 的错误 bucket 数量不是越多越好,一般来说,添加到ready queue的速度取决与 redis 性能,而不是 bucket 数量9. 使用中可能出现的问题9.1 客户端出现大量的 TIME_WAIT 状态,并且新的连接被拒绝netstat -anp | grep 9503 | wc -l
tcp 0 0 10.8.8.188:41482 10.8.8.185:9503 TIME_WAIT -这个是正常现象,由 tcp 四次挥手可以知道,当接收到 LAST_ACK 发出的 FIN 后会处于TIME_WAIT状态,主动关闭方(客户端)为了确保被动关闭方(服务端)收到 ACK,会等待 2MSL 时间,这个时间是为了再次发送 ACK,例如被动关闭方可能因为接收不到 ACK 而重传 FIN;另外也是为了旧数据过期,不影响到下一个链接;如果要避免大量TIME_WAIT的连接导致 tcp 被耗尽,一般方法如下:使用长连接 配置文件,网上很多教程,就是让系统尽快的回收TIME_WAIT状态的连接 使用连接池,当连接池耗尽时,阻塞等待,直到回收再利用10. 相关链接有赞延迟队列设计 Redis 实现队列