Python中快速简单的并行化
几乎所有可用的计算机都具有一定的并行化能力。我正在使用的计算机已有7年历史,拥有单个处理器,但具有8个内核(四个核心,八个线程)的能力。
幸运的是,流行包中的函数(如scikit-learn模型)具有适应并行化的参数,如n_jobs。然而,Python中的许多日常数据操作并没有充分利用我们计算机固有的这些现成的功能。重复的迭代操作会一直抓取,并且永远持续下去,比如文本清理和自然语言处理的数据准备。
在我的工作过程中,我介绍了Python的并行化功能 - 并且对我的工作流程进行了补充。
在python中实现多处理和joblib库的并行化实际上非常简单。
import multiprocessing
from joblib import Parallel, delayed
num_cores = multiprocessing.cpu_count()
inputs = myList
if __name__ == "__main__":
processed_list = Parallel(n_jobs=num_cores)(delayed(my_function(i,parameters)
for i in inputs)
代码解释
multiprocessing.cpu_count() 获取核心数量,并扩展计算机可以处理的作业数量。
inputs存储我们希望我们的函数迭代的项目集。
if __name__ == "__main__":这设置了我们的并行进程在__main__模块内运行。
processed_list 是保持我们的功能结果的一个对象。
从右到左
delayed(my_function(i,parameters) for i in inputs)幕后创建函数的数组,I和 parameters,用于每次迭代。Delayed创建这些元组,然后Parallel将这些传递给解释器。
Parallel(n_jobs=num_cores)多处理器来执行繁重作业。Parallel将Python解释器分成许多与作业数量相等的进程(以及可扩展的核心数量)。每个进程将运行一次迭代,并返回结果。
实施
编写可并行化的自定义函数有点棘手,因为它需要我们更加仔细地思考输入和输出。那是什么意思?
如果我们写了一个函数,比如:
def my_function(myList):
for element in myList:
do something
return result
该函数将循环列表中的每一个元素,做一些事情,然后返回结果。使用Parallel和delayed(), 我们不是将元素列表传递给 my_function, 而是一次传递单个元素myListto 给my_function 。
在上面的例子中,如果myList是一个字符串列表,Parallel和delayed()会遍历字符串中的每个字符!
现在考虑输出:如果我们期望从函数中得到的结果应该是myList平方中的每个元素的和,那么当我们的并行函数返回一个平方元素的列表时,我们会感到惊讶。
def my_function(myList):
return np.sum([item ** 2 for item in myList])
因为这些小的操作以tuple的形式分散在所有的核心上,每个作业处理1个元素,然后将所有的元素放在一起。
在我们上面的例子中,当我们期望my_function返回1 quantity,Parallel并delayed()为每个元素返回1 quantity!
附件:
在tqdm()中包装myList是一种方便的方法来监视并行化过程的进展,并看到它的好处。
import multiprocessing
from joblib import Parallel, delayed
from tqdm import tqdm
num_cores = multiprocessing.cpu_count()
inputs = tqdm(myList)
if __name__ == "__main__":
processed_list = Parallel(n_jobs=num_cores)(delayed(myfunction)(i,parameters) for i in inputs)