[python 并行3]进程

进程篇

基本使用 1

#coding=utf-8

import multiprocessing
import os       # 获取pid用
import time     # 延时用

# 子进程要执行的函数
def child_proc(name):
    print(f'child process {name} pid: {os.getpid()}')
    time.sleep(3)
    print(f'{name} finish')

# 主进程,必须在主模块中执行
if __name__ == '__main__':
    print(f'parent process {os.getpid()} is running')

    # 生成子进程
    p1 = multiprocessing.Process(target = child_proc, args = ('child-1',))
    p2 = multiprocessing.Process(target = child_proc, args = ('child-2',))
    p1.start()
    p2.start()

    print(f'parent process {os.getpid()} is end')

输出

parent process 20114 is running
parent process 20114 is end
child process child-1 pid: 20115
child process child-2 pid: 20116
child-1 finish
child-2 finish

注意! Python官方文档提到为何必须要使用if __name__ = ‘__main__‘,由于该包的所有功能都需要将主模块导入到子模块中,但是IDLE无法将__main__模块导入子模块,所以只能在文件中编辑好程序执行
更多multiprocessing — Process-based parallelism


multiprocessing模块

:根据查看multiprocessing模块声明,第一行注释显示# Package analogous to ‘threading.py‘ but using processes,可以发现进程的操作与线程相似 (甚至多进程的模块直接就是线程模块改过来的)

函数声明: class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, , daemon=None)
(group: 官方预留的参数)
target: 子线程要执行的函数
name: 给子线程命名
args: 传递参数到要执行的函数中
(类型为元组)*
daemon: 将线程设置为后台线程 2

Thread类包含的方法:

  • start(): 开始进程,它会安排在单独的控制进程中使该对象的run()方法被调用 (invoked) (如果多次调用,会发生错误AssertionError
  • run(): 你可以在子类中重写这个方法,标准的run()方法会在构造器传递了target参数后调用它
  • join(timeout=None): 阻塞当前进程,直到等待调用了join()的进程结束,或到达设置的超时timeout的参数为止
  • name: 进程名
  • is_alive(): 判断进程是否在运行
  • daemon: 是否为后台进程的属性值
  • pid: 返回进程的id
  • terminate(): 结束进程
    (在Unix上,使用SIGTERM信号量完成;在windows上使用TerminateProcess())
    (请注意,不会执行退出处理程序和最后的子句等。请注意,进程的后代进程不会被终止 - 它们将简单地变成孤立的。)
    (注:如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并可能被其他进程无法使用。 类似地,如果进程已获得锁或信号量等,则终止它可能导致其他进程死锁。)
  • kill(): 杀掉进程,与terminate()相同,但在Unix中使用SIGKILL信号量
  • close(): 关闭进程对象,释放所有与之相关的资源。如果是还在运行,会raised错误ValueError,第一次调用会返回成功,其它调用会raise错误ValueError
  • exitcode: 子进程的退出码。如果不是被terminate终止的,将会是None;如果被信号量N终止的,将会返回-N
  • authkey: 进程的身份钥匙(1字节字符串)。当multipriocessing在主进程中被初始化时,会使用os.urandom()标记一个随机字符串。当一个进程对象被创建时,它将会从父进程继承这个身份钥匙,虽然它可能会被改为其它字节字符串
  • sentinel: (哨兵)当进程结束时,一个数值的系统对象处理将变为ready。如果你想立即要等待几个事件,你能用这个值使用multiprocessing.connection.wait(),否则调用join()更简单。

Pool进程池

如果进程太多,超过了CPU核数,会导致进程之间的来回切换,影响性能。可以通过创建进程池,把进程加入到里面,如果池中进程没满,就会创建一个进程来执行请求;如果池中进程达到规定的最大值,那么请求会等待
函数声明: class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes: 指定池中进程数量,如果不指定,则为None,默认就会使用os.cpu_count()
initializer: 如果不是None,每个进程在开始时都会调用initializer(*initargs)
maxtasksperchild: 工作进程在退出和替换新的工作进程之前,可以完成的任务数量,以释放资源。默认为None,表示工作进程和这个池的生存时间一样长
context: 用来指定工作进程的上下文

下面有2种方法添加进程到进程池中,分别是apply_asyncapply

apply_async

apply_async()用来同步执行进程,允许多个进程同时进入池;是异步非阻塞的
函数声明: apply_async(func[, args[, kwds[, callback[, error_callback]]]])
callback: 如果指定了回调,那么它应该是一个可调用的,它接受一个参数

#coding=utf-8

import multiprocessing
import time     # 延时用

# 子进程要执行的函数
def child_proc(index):
    print(f'{index} process is running')
    time.sleep(3)
    print(f'{index} process is end')

# 主进程,必须在主模块中执行
if __name__ == '__main__':
    print(f'all process start')

    # 生成进程池
    p = multiprocessing.Pool()
    for i in range(5):
        p.apply_async(func = child_proc, args = (i,))
    p.close()
    p.join()    # 注! 如果不执行此句,将会直接退出主进程

    print(f'all process done!')

输出

all process start
process is running
process is running
process is running
process is running
process is running
process is end
process is end
process is end
process is end
process is end
all process done!

apply

apply(): 只允许一个进程进入池,在一个进程结束后,另一个才能进入;是阻塞的
函数声明: apply(func[, args[, kwds]])

p.apply(func = child_proc, args = (i,)) # 将上面代码中的apply_async换成apply即可

输出

all process start
process is running
process is end
process is running
process is end
process is running
process is end
process is running
process is end
process is running
process is end
all process done!

分析:对比apply_async和apply的输出,可以发现,apply_async是同步执行的,而apply是一个一个进入池中执行的


数据共享

参考:Python多进程编程-进程间共享数据
PipeQueue都有一定的数据共享功能,但他们会阻塞进程)
Queue:采用共享队列的内存的方式共享数据
注意:存在queue.Queuemultiprocessing.Queue两种队列
queue.Queue:是进程内非阻塞队列,各进程私有
multiprocessing.Queue:是跨进程通信队列,各个子进程共有

共享内存:使用multiprocessing的ValueArray类,实现共享内存的方式共享数据
共享进程:使用multiprocessing的Manager类,实现共享进程的方式共享数据

获取返回值(仅主进程获有数据)

针对进程池实现的方式,可以直接通过获取进程对象的返回值

#coding=utf-8

import multiprocessing

# 子进程要执行的函数
def child_proc(x, y):
    return x + y

# 主进程,必须在主模块中执行
if __name__ == '__main__':
    # 生成进程池
    p = multiprocessing.Pool()
    z = p.apply(func = child_proc, args = (1, 2))
    print(z)

Queue

使用multiprocessing的Queue类,实现进程之间的数据共享

#coding=utf-8

from multiprocessing import Process, Queue

# 子进程要执行的函数
def child_proc(queue):
    num = queue.get()
    num += 10
    queue.put(num)

# 主进程,必须在主模块中执行
if __name__ == '__main__':
    # 创建共享数据
    queue = Queue()
    queue.put(1000)

    # 创建进程
    p1 = Process(target = child_proc, args = (queue,))
    p2 = Process(target = child_proc, args = (queue,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    # 打印结果
    print(queue.get())

Value、Array

共享内存有2个结构 - ValueArray,它们内部都实现了锁机制,因此是多进程安全的

#coding=utf-8

from multiprocessing import Process, Value, Array

# 子进程要执行的函数
def child_proc(num, li):
    num.value += 100
    for i in range(len(li)):
        li[i] += 10

# 主进程,必须在主模块中执行
if __name__ == '__main__':
    # 创建共享数据
    num = Value('d', 0.0)
    li = Array('i', range(10))

    # 创建进程
    p1 = Process(target = child_proc, args = (num, li))
    p2 = Process(target = child_proc, args = (num, li))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    # 打印结果
    print(num.value)
    for x in li:
        print(x)

ValueArray都需要设置其中存放值的类型。d:double;i:int;c:char等
详细:转到multiprocessing.sharedctypes可以查看到各种类型的字符串定义

Manager

(上面的共享内存通过Value和Array结构实现,这些值在主进程中管理,很分散)
Manager通过共享内存来实现共享数据,支持的数据类型很多
详细multiprocessing.managers

#coding=utf-8

from multiprocessing import Process, Manager

# 子进程要执行的函数
def child_proc(dict1, list1):
    dict1['yourname'] += ' snow'
    list1[3] += 10

# 主进程,必须在主模块中执行
if __name__ == '__main__':
    # 创建共享数据
    manager = Manager()
    dict1 = manager.dict()
    list1 = manager.list(range(4))

    dict1['yourname'] = 'youmux'
    list1[3] = 0

    # 创建进程
    p1 = Process(target = child_proc, args = (dict1, list1))
    p2 = Process(target = child_proc, args = (dict1, list1))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    # 打印结果
    print(dict1['yourname'])
    for x in list1:
        print(x)

1.参考书籍: 参考书籍:《Python并行编程手册》

2.后台线程: 后台线程在主线程停止后就直接停止运行。他们资源(如打开的文件,数据库事务等)可能不会被正确的释放。如果你想要你的线程优美的停止,让他们不要变为后台和使用一个合适的信号机制如事件Event

相关推荐