Python基础 - 多进程(下)

上篇主要对多任务从生活上来认识, 同时引入对 进程 的认识, 即操作系统资源分配的基本单元. 然后通过对 并发, 并行 概念的认识, 去理解 任务调度. 然后用内置的 multiprocessing 模块来实现多任务的基本操作过程, 大致为:

创建多个任务 --> 为每个任务创建一个进程 --> 启动进程

注意参数的两种传递方式: args=(a,b, c...) 和 kwargs={‘a‘:123, ‘b‘: 456}

还留了一个问题, 关于多进程之前 不能共享全局变量, 而其实要解决这个问题, 就通过一个消息队列(Queue) 来实现消息的传递哦.

主进程 - 等待所有子进程

即多线程的一个默认特性是, 主进程会等待所有的子进程结束后, 才会结束哦.

import os
import time
import multiprocessing


def task_01(n):
    for i in range(n):
        print("task_01 is working...")
        time.sleep(0.5)


if __name__ == '__main__':
    # 创建子进程
    p1 = multiprocessing.Process(target=task_01, args=(3,))
    p1.start()

    # 主进程等待
    time.sleep(1)
    print('主进程执行完毕...')
task_01 is working...
task_01 is working...
主进程执行完毕...
task_01 is working...

这里发现, 主进程是要等待所有的子进程都执行结束后, 才会真正结束哦,

守护主进程

即当主进程结束的时候, 杀掉所有的子进程, 即设置进程的属性 de

import os
import time
import multiprocessing


def task_01(n):
    for i in range(n):
        print("task_01 is working...")
        time.sleep(0.5)


def task_02(n):
    for i in range(n):
        print('task_02 is working..')
        time.sleep(0.2)

if __name__ == '__main__':
    # 创建子进程
    p1 = multiprocessing.Process(target=task_01, args=(3,))
    p2 = multiprocessing.Process(target=task_02, args=(3,))

    p1.daemon = True  # 设置为守护主进程
    # p2.daemon = True

    p1.start()
    p2.start()

    # 主进程等待
    time.sleep(1)
    print('主进程执行完毕...')
    # p2.terminate()
task_01 is working...
task_02 is working..
task_02 is working..
task_02 is working..
task_01 is working...
主进程执行完毕...

设置为守护主进程, 这样就比较灵活一下, 能适应各种业务场景.

消息队列Queue

用来解决, 进程之间的通信的哦.

import time
import multiprocessing


def add_data(queue):
    for i in range(5):
        if queue.full():
            print("queue is full")
            break
        queue.put(i)
        time.sleep(0.1)
        print('add cur_data:', i)


def get_data(queue):
    while True:
        if queue.qsize() == 0:
            print('queue is None')
            break
        print(f"get cur_data: {queue.get()}")




if __name__ == '__main__':
    # 创建消息队列
    queue = multiprocessing.Queue(3)

    # 创建各自的进程
    p_add = multiprocessing.Process(target=add_data, args=(queue,))
    p_get = multiprocessing.Process(target=get_data, args=(queue,))

    p_add.start()
    p_add.join() # 主进程等 p_add 执行完后再执行

    p_get.start()
add cur_data: 0
add cur_data: 1
add cur_data: 2
queue is full
get cur_data: 0
get cur_data: 1
get cur_data: 2
queue is None

进程池 Pool

之前的方式是, 可以为任务手动用 Process 来创建多个进程, 但当任务量是, 几千几万个是, 手动来弄, 似乎有手写难搞. 进程池就是基于这样的场景下, 自动根据任务数量而创建最合理的进程数, 这样不仅不仅场景变多了, 而且能合理利用资源, 更为重要一点, 我感觉是写代码上, 真的非常简洁优雅.

而基本的流程, 在我们初始化 Pool 时候, 可以手动指定一个最大的进程数 (超参数), 当新的请求提交到 Pool 的时候, 如果池没有满, 则创建一个新的进程来执行该请求;

而当池子是满的, 即池子中的进程数已经达到了最大值, 则 请求会自动等待, 直到池子中有空闲资源, 然后就该任务就加入进程池. 这感觉还挺智能的, 说实话.

同步任务

任务一个接一个地完成. 即下一个任务要等上一个任务跑完后才能再继续执行哦.

同步 vs 异步

  • 生活中, 同步, 多个任务一起跑;
  • 编程中, 同步, 多个任务按顺序跑; 异步同时执行多任务, 不等待
import time
import multiprocessing


def send_email():
    print("send_email ...", multiprocessing.current_process().pid)
    time.sleep(1)

    
if __name__ == '__main__':
    
    # 创建Pool 并设置最大进程数
    pool = multiprocessing.Pool(3)
    # 大批量任务哦
    for i in range(6):
        pool.apply(send_email)
        
    # 同步执行任务,要等一个任务完,再执行另一个, 别混淆跟 多进程.
send_email ... 15348
send_email ... 14928
send_email ... 7156
send_email ... 15348
send_email ... 14928
send_email ... 7156

异步任务

这厉害了, 就是多个任务同时执行, 且多进程, 这个效率就非常高呀.

import time
import multiprocessing

def send_email(n):
    print("send_email ...", multiprocessing.current_process().pid)
    time.sleep(1)

if __name__ == '__main__':
    # 创建Pool 并设置最大进程数
    pool = multiprocessing.Pool(3)
    # 大批量任务哦
    for i in range(6):
        # apply_async: 异步, 执行顺序是Pool自己调度哦
        pool.apply_async(send_email, args=(i, ))

    pool.close() # 关闭进程池
    pool.join()  # 等待所有任务执行完成
send_email ... 8096
send_email ... 14520
send_email ... 9324
send_email ... 8096
send_email ... 14520
send_email ... 9324

我是感觉, 这个异步任务, 其顺序自己调度, 有点东西哦. 在工作中, 用异步的情景有, 之前有批量处理文件, 几千个嘛, 表字段差不多的, 需求是对每个文件进行判断, 里面的某个字段的值是否为 xxx . 在这 1000 个文件中, 先处理谁, 后处理谁是没有关系的, 因此就异步呗. 还有一个异步场景是, 爬虫 , 大批量的 url 等着被请求解析, 当然用异步啦.

同步任务. 目前我好像是很少用哦. 主要是我的一些数据处理, 分析的一些活吧, 大多都比较简单, 就读取个数据, 写写简单的 sql, 用 用pandas啥的, 就能满足了, 用的不是太多哦.

小结

  • 理解多任务, 多进程, 在生活和工作中是很常见. 通俗即多个任务一起跑, 真正的其实是cpu的任务调度
  • 多进程之间不能共享全局变量, 可通过消息队列来通信
  • 主进程默认是等子进程结束才结束, 可对子进程设置 daemon 属性来守护主进程; join() 则是等待子进程
  • 进程池用于多任务量, 自动创建合理进程的场景, 主要用同步(顺序) 任务 和 异步任务两种方式.

感觉多进程, 到这差不多了, 基本概念理清楚, 然后能用就行, 也是做个笔记, 主要是为方便后面 copy 代码而已, 嗯, 后面是可以添加一几个小案例. 多任务, 除了多进程, 还有多线程, 协程这样的概念, 一个个都会有的.

哦, 今天是2020年2月29日, 闰年哦, 希望4年后的今天, 我已经变成了数据大牛, 知识大牛. 也需个愿, 并坚持每日学习不怠.

相关推荐