Python 多进程实用编程技巧总结,建议收藏学习
一、为什么选择多进程编程
我们在多线程篇说过 Python 有一个 GIL 锁,这导致我们无法发挥多核CPU 的性能,于是对于一些耗CPU 的操作(比如:计算、图像处理),我们使用多线程编程显得就不那么好,于是我们采用多进程编程,这样就能充分利用CPU 并发来提高运行的效率(多I/O操作的尽量使用多线程编程,这样不会影响性能)
注:本文使用 Python3 实现,但是除了和 Python2 在 print 上的差别外其他都是一样的
有需要Python学习资料的小伙伴吗?小编整理【一套Python资料、源码和PDF】,感兴趣者可以关注小编后私信学习资料(是关注后私信哦)反正闲着也是闲着呢,不如学点东西啦
二、多线程与多进程的直观比较
1.我们以计算斐波那契数列为例来比较两种方式的执行速度
1.多线程方式
示例代码:
import time from concurrent.futures import ThreadPoolExecutor,as_completed def fib(n): if n<= 2: return 1 return fib(n-1)+fib(n-2) with ThreadPoolExecutor(3) as executor: tasks = [executor.submit(fib, (num)) for num in range(25,35)] start_time = time.time() for future in as_completed(tasks): data = future.result() print("exe result {num}".format(num = data)) print("last time is {time}".format(time = time.time()-start_time))
结果:
exe result 75025 exe result 121393 exe result 196418 exe result 317811 exe result 514229 exe result 832040 exe result 1346269 exe result 2178309 exe result 3524578 exe result 5702887 last time is 3.65224289894104
2.多进程方式:
示例代码:
import time from concurrent.futures import ThreadPoolExecutor,as_completed from concurrent.futures import ProcessPoolExecutor def fib(n): if n<= 2: return 1 return fib(n-1)+fib(n-2) if __name__ == '__main__': with ProcessPoolExecutor(3) as executor: tasks = [executor.submit(fib, (num)) for num in range(25,35)] start_time = time.time() for future in as_completed(tasks): data = future.result() print("exe result {num}".format(num = data)) print("last time is {time}".format(time = time.time()-start_time))
结果:
exe result 75025 exe result 121393 exe result 196418 exe result 317811 exe result 514229 exe result 832040 exe result 1346269 exe result 2178309 exe result 3524578 exe result 5702887 last time is 2.270967960357666
结论:
可以很清楚地看到使用多线程的方式计算来的更快
2.我们以频繁I/O操作的任务为例来比较两种方式的执行速度
1.多线程方式
示例代码:
import time from concurrent.futures import ThreadPoolExecutor,as_completed from concurrent.futures import ProcessPoolExecutor def random_sleep(n): time.sleep(n) return n if __name__ == '__main__': with ThreadPoolExecutor(3) as executor: tasks = [executor.submit(random_sleep, (num)) for num in [2]*30] start_time = time.time() for future in as_completed(tasks): data = future.result() print("exe result {num}".format(num = data)) print("last time is {time}".format(time = time.time()-start_time))
结果:
last time is 20.006227493286133
2.多进程方式:
示例代码:
import time from concurrent.futures import ThreadPoolExecutor,as_completed from concurrent.futures import ProcessPoolExecutor def random_sleep(n): time.sleep(n) return n if __name__ == '__main__': with ProcessPoolExecutor(3) as executor: tasks = [executor.submit(random_sleep, (num)) for num in [2]*30] start_time = time.time() for future in as_completed(tasks): data = future.result() print("exe result {num}".format(num = data)) print("last time is {time}".format(time = time.time()-start_time))
结果:
last time is 20.205044746398926
结论:
对于I/O 操作来讲,多线程方式要优于多进程方式
三、multiprocessing 多进程编程
进程的数据是完全隔离的,不能像线程一样通过全局变量进行通信。
多进程编程首选还是使用上面我们测试用的那个包,因为他和多线程编程的接口一致,设计精良,而这个multiprocessig 更加底层一些
示例代码:
import multiprocessing import time def get_html(n): time.sleep(n) print("sub progress success") return n if __name__ == '__main__': progress = multiprocessing.Process(target=get_html,args=(2,)) progress.start() print(progress.pid) progress.join() print("main progress end")
结果:
29244 sub progress success main progress end
四、使用进程池
示例代码:
import multiprocessing import time def get_html(n): time.sleep(n) print("sub process success") return n if __name__ == '__main__': pool = multiprocessing.Pool(multiprocessing.cpu_count())# 创建线程池 result = pool.apply_async(get_html,args=(3,)) #添加进线程池 pool.close() #禁止其他进程再次加入 pool.join() print(result.get())
结果:
sub process success 3
当然我们可能会想用迭代的进程池,没问题,Python 给我们提供了这样的方法 pool.imap(),
示例代码:
import multiprocessing import time def get_html(n): time.sleep(n) print("sub process success") return n if __name__ == '__main__': pool = multiprocessing.Pool(multiprocessing.cpu_count())# 创建线程池 for result in pool.imap(get_html,[1,3,5,7,9]): print("sleep {time} success".format(time = result))
结果:
sub process success sleep 1 success sub process success sleep 3 success sub process success sleep 5 success sub process success sleep 7 success sub process success sleep 9 success
五、进程间通信
方法一:使用进程队列
示例代码:
from multiprocessing import Process,Queue import time def Producer(queue): queue.put("a") time.sleep(2) def Consumer(queue): time.sleep(2) res = queue.get() print(res) if __name__ == '__main__': queue = Queue(10) task1 = Process(target=Producer,args=(queue,)) task2 = Process(target=Consumer,args=(queue,)) task1.start() task2.start() task1.join() task2.join()
结果:
a
注意:
1.共享全局变量进行通信的方法在多进程编程中是不能实现的,因为进程在 fork 的时候会将所有的变量赋值一分到自己的空间,进程之间是隔离的
2.Queue() 是不能用于进程池的进程之间的通信的
方法二:使用Manager 中的 Queue 实现进程池中的通信
示例代码:
from multiprocessing import Process,Queue,Pool,Manager import time def Producer(queue): queue.put("a") time.sleep(2) def Consumer(queue): time.sleep(2) res = queue.get() print(res) if __name__ == '__main__': queue = Manager().Queue(10) pool = Pool(2) pool.apply_async(Producer,args=(queue,)) pool.apply_async(Consumer,args=(queue,)) pool.close() pool.join()
结果:
a
方法三:使用Pipe 实现两个进程之间的通信
pipe 只能用于两个进程之间的通信,效率比queue 高
示例代码:
from multiprocessing import Process,Pipe
def Producer(pipe):
pipe.send(“a”)
def Consumer(pipe):
print(pipe.recv())
if name == ‘main‘:
receive_pipe,send_pipe = Pipe()
task1 = Process(target=Producer,args=(send_pipe,))
task2 = Process(target=Consumer,args=(receive_pipe,))
task1.start()
task2.start()
task1.join()
task2.join()
结果:
a
六、进程间内存共享
示例代码:
from multiprocessing import Process import multiprocessing def add_data(p_dict,key,value): p_dict[key] = value if __name__ == '__main__': mgr = multiprocessing.Manager() progress_dict = mgr.dict() first_progress = Process(target=add_data,args=(progress_dict,"Bob",22)) second_progress = Process(target=add_data,args=(progress_dict,"Alic",20)) first_progress.start() second_progress.start() first_progress.join() second_progress.join() print progress_dict
结果:
{'Bob': 22, 'Alic': 20}
解释:
可以看到,虽然是不同的进程,但是他们共同操纵了一个变量