Prefetch count--预取数量

一、前言

前面提到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。

Prefetch count--预取数量

二、事例

生产端:

# -*- coding: UTF-8 -*-

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

# 声明队列,并进行队列持久化
channel.queue_declare(queue='task_queue', durable=True)

# 信息内容
message = "Hello,World!"

channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 消息持久化
                      ))
print('[x] Sent %r' % message)
connection.close()

消费端:

# -*- coding: UTF-8 -*-

import pika
import time
import random

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

# 声明一个队列,并队列持久化
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print('[x] Received %r' % body)
    time.sleep(random.randint(1, 20))
    print('[x] Done')
    # 当消息处理完成后,主动通知rabbitmq,之后rabbitmq才会删除
    # 队列中的这条消息
    ch.basic_ack(delivery_tag=method.delivery_tag)

# prefetch_count = 1 如果消费者中有一条消息没有
# 处理完,就不会继续给这个消费者发送新消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

# 永远收下去,没有就在这卡住
channel.start_consuming()

可以运行多个consumer,让它们不断接受消息,可以看到只有当consumer中的消息处理完成了,才会接受下一个消息。

相关推荐