python 线程通信 生产者与消费者

"""
线程通信的生产者与消费者
python的queue模块中提供了同步的线程安全的队列类,都具有原子性,实现线程间的同步
Queue (FIFO: fist in fist out)
LifoQueue (LIFO: last in fist out)
PriorityQueue (优先级队列)

task_done():
作用是在使用join()的时候,当queue中所有的项目都被取出,且每个项目取出后都使用了task_done(),那么就可以释放join()阻塞
系统解释如下:
用于消费者,每次get()以后,使用task_done() 是告诉队列正在处理的get任务完成
如果join()当前处于阻塞状态,那么当处理完所有项时它将继续运行(这意味着对于已经放入队列的每个项都接收到task_done()调用)。
如果调用的次数超过在队列中放置的项的次数,则引发ValueError错误。

如果不需要join()的时候也可以不使用task_done()


"""
import queue
import threading
import time
import random

q = queue.Queue(10)


def produce():
    i = 0
    while i < 10:
        num = random.randint(1, 100)
        q.put("生产者生产出数据:%d" % num)
        print("生产者生产出数据:%d" % num)
        time.sleep(0.2)
        i += 1
    print("生产结束")


def consume():
    while True:
        time.sleep(0.3)
        if q.empty():
            break
        item = q.get()
        print("消费者取出:", item)
        q.task_done()

    print("消费者结束")


if __name__ == ‘__main__‘:

    # 创建生产者
    t1 = threading.Thread(target=produce, name="生产者")
    t1.start()
    time.sleep(0.1)

    # 创建消费者
    t2 = threading.Thread(target=consume, name="消费者")
    t2.start()
    q.join()

    print("over")


# from threading import Thread
# import time
# import random
# from queue import Queue
# from collections import deque
# from datetime import datetime
#
# # 创建队列,设置队列最大数限制为3个
# queue = Queue(3)
#
#
# # 生产者线程
# class Pro_Thread(Thread):
#     def run(self):
#         # 原材料准备,等待被生产,这里使用的是双向队列
#         tasks = deque([1, 2, 3, 4, 5, 6, 7, 8])
#         global queue
#         while True:
#             try:
#                 # 从原材料左边开始生产,如果tasks中没有元素,调用popleft()则会抛出错误
#                 task = tasks.popleft()
#                 queue.put(task)
#                 print(datetime.now(), "生产", task, "现在队列数:", queue.qsize())
#
#                 # 休眠随机时间
#                 time.sleep(0.5)
#             # 如果原材料被生产完,生产线程跳出循环
#             except IndexError:
#                 print("原材料已被生产完毕")
#                 break
#         print("生产完毕")
#
#
# # 消费者线程
# class Con_Thread(Thread):
#     def run(self):
#         global queue
#         while True:
#             if not queue.empty():
#                 # 通过get(),这里已经将队列减去了1
#                 task = queue.get()
#                 time.sleep(2)
#                 # 发出完成的信号,不发的话,join会永远阻塞,程序不会停止
#                 queue.task_done()
#                 print(datetime.now(), "消费", task)
#             else:
#                 break
#         print("消费完毕")
#
#
# # r入口方法,主线程
# def main():
#     Pro_1 = Pro_Thread()
#     # 启动线程
#     Pro_1.start()
#     # 这里休眠一秒钟,等到队列有值,否则队列创建时是空的,主线程直接就结束了,实验失败,造成误导
#     time.sleep(1)
#     for i in range(2):
#         Con_i = Con_Thread()
#         # 启动线程
#         Con_i.start()
#     global queue
#     # 接收信号,主线程在这里等待队列被处理完毕后再做下一步
#     queue.join()
#     # 给个标示,表示主线程已经结束
#     print("主线程结束")
#
#
# if __name__ == ‘__main__‘:
#     main()

相关推荐