python分布式框架rq的使用
RedisQueue是一款轻量级的分布式异步任务队列调度框架,基于redis数据库作为broker,生产端将任务job存储到redis数据库中,消费端监听队列并取出任务执行。
1.基础架构
rq框架使用前需要安装rq库,使用pip安装即可:pip install rq
1.1 生产者
生产者将任务发送到指定redis的指定队列中,job.py:
import requests from rq import Queue # 具体业务执行代码 def count_words_at_url(url): resp = requests.get(url) return len(resp.text.split()) conn = redis.Redis(host='127.0.0.1', password='123456', port=8888, db=10) # 指定redis数据库 q = Queue("high", connection=conn) # 指定队列 job_urls = ['http://baidu.com', 'http://qq.com', 'http://sohu.com'] for url in job_urls: q.enqueue_call(count_words_at_url, args=(url,)) # 发送任务
1.2 消费者
消费者采用多进程架构,监听指定队列从中取出任务消息,并调用执行业务处理函数,worker.py:
import redis from multiprocessing import Pool from rq import Worker, Queue, Connection conn = redis.Redis(host='127.0.0.1', password='123456', port=8888, db=10) # 同一个redis连接 # worker逻辑,无需修改 def worker(listen): with Connection(conn): worker = Worker(map(Queue, listen)) worker.work() if __name__ == "__main__": listen = ['high', 'default', 'low'] # 监听的队列,可自定义修改 try: cpu_num = 4 p = Pool(cpu_num) for i in range(cpu_num): p.apply_async(worker, args=(listen,)) # 开启worker p.close() p.join() except Exception as e: print(e)
2.基于redis集群
若是基于redis集群,可以用如下方式获取redis连接:
from rq.cli.helpers import get_redis_from_config settings = {'SENTINEL': { 'INSTANCES': [('redis1.cloud.bz', 8080), ('redis2.cloud.bz', 8080), ('redis3.cloud.bz', 8080)], 'SOCKET_TIMEOUT': None, 'PASSWORD': '', 'DB': 10, 'MASTER_NAME': 'sentinel-127.0.0.1-8080' } } conn = get_redis_from_config(settings)
相关推荐
sushuanglei 2020-11-12
地平线 2020-11-02
ptmagic 2020-10-31
richermen 2020-10-15
jaryle 2020-10-13
深圳克林斯曼 2020-10-10
koko0c 2020-09-26
ahaoGG 2020-09-24
互联网架构之路 2020-09-17
阿义 2020-09-11
Cheetahcubs 2020-09-07
互联网架构之路 2020-09-03
憧憬 2020-08-21
zyshappy 2020-08-16
loviezhang 2020-08-08
xayddxjsjxywuhui 2020-07-20
唐亚杰 2020-07-17
ZHANGYONGHAO0 2020-07-05
枫叶上的雨露 2020-07-04