python multiprocessing.pool.apply_async 占用内存多 解决方法
multiprocessing.pool.apply_async 可以执行并行的进程,但是会将所有进程先读入列表,对于不是很多数量的进程来说没有问题,但是如果进行数量很多,比如100万条,1000万条,而进程不能很快完成,内存就会占用很多,甚至挤爆内存。那么如何限制内存的占有量呢,可以检测pool._cache的长度,如果超过一定的长度,就让最后进入pool中的进程等待,以达到减少内存占有的目录。
from multiprocessing import Pool import time def downloadGif(arg): print(arg[0]) time.sleep(1) def downloading_over(arg): pass def foo(num): for i in range(num,1000001): pic_info=[] pic_info.append(str(i)+‘gif‘) txt_info=[] txt_info.append(str(i)+‘txt‘) yield pic_info,txt_info if __name__ == ‘__main__‘: pool = Pool(processes=5) # set the processes max number count=1 for download in foo(2): pool.apply_async(func=downloadGif, args=(download[0],),callback=downloading_over) last=pool.apply_async(func=downloadGif, args=(download[1],),callback=downloading_over) count=count+1 print(count) if len(pool._cache) > 1e3: print("waiting for cache to clear...") last.wait() #1e3,500条,占有内存10M #1e4,5000条,占有内存20M #1e5,50000条,占有内存200M #1e6,500000条,占有内存2000M pool.close() pool.join()
核心代码:
if len(pool._cache) > 1e3: print("waiting for cache to clear...") last.wait()
last 是 AsyncResult
的实例,是pool的返回值
https://docs.python.org/3/library/multiprocessing.html
class multiprocessing.pool.
AsyncResult
¶
The class of the result returned by <span>Pool.apply_async()</span>
and <span>Pool.map_async()</span>
.
get
([timeout])Return the result when it arrives. If timeout is not
<span>None</span>
and the result does not arrive within timeout seconds then<span>multiprocessing.TimeoutError</span>
is raised. If the remote call raised an exception then that exception will be reraised by<span>get()</span>
.
wait
([timeout])Wait until the result is available or until timeout seconds pass.
ready
()Return whether the call has completed.
successful
()Return whether the call completed without raising an exception. Will raise
<span>ValueError</span>
if the result is not ready.
本文参考下面链接回答: