使用并发加速你的python程序(2)
asyncio 版本
在开始研究asyncio示例代码之前,我们来更多地讨论一下asyncio是如何工作的。
asyncio基础知识
这将是asycio的一个简化版本。这里有很多细节被忽略了,但它仍然解释了它是如何工作的。
asyncio的一般概念是一个单个的Python对象,称为事件循环,它控制每个任务如何以及何时运行。事件循环会关注每个任务并知道它处于什么状态。在实际中,任务可以处于许多状态,但现在我们假设一个简化的只有两种状态的事件循环。
就绪状态将表明一个任务有工作要做,并且已经准备好运行,而等待状态意味着该任务正在等待一些外部工作完成,例如网络操作。
我们简化的事件循环维护两个任务列表,每一个对应这些状态。它会选择一个就绪的任务,然后重新启动它。该任务处于完全控制之中,直到它配合地将控制权交还给事件循环为止。
当正在运行的任务将控制权交还给事件循环时,事件循环将该任务放入就绪或等待列表中,然后遍历等待列表中的每个任务,以查看I/O操作完成后某个任务是否已经就绪。时间循环知道就绪列表中的任务仍然是就绪的,因为它知道它们还没有运行。
一旦所有的任务都重新排序到正确的列表中,事件循环将选择下一个要运行的任务,然后重复这个过程。我们简化的事件循环会选择等待时间最长的任务并运行该任务。此过程会一直重复,直到事件循环结束。
asyncio的一个重要之处在于,如果没有刻意去释放控制权,任务是永远不会放弃控制权的。它们在操作过程中从不会被打断。这使得我们在asyncio中比在threading中能更容易地共享资源。你不必担心代码是否是线程安全的。
这是从高级层面来看待asyncio的运行过程的。如果你想更深入地了解更多细节,这个StackOverflow的答案(https://stackoverflow.com/a/51116910/6843734 )提供了一些很好的细节。
async 和 await
现在我们来讨论添加到Python中的两个新关键字: async和await。根据上面的讨论,你可以将await看作是允许任务将控制权交还给事件循环的魔法。当你的代码在等待函数调用时,这是一个信号,表明调用可能需要一段时间,并且任务应该放弃控制。
最简单的方式是将async看作Python的一个标志,它会告诉Python将要定义的函数会使用await。在某些情况下,这并不完全正确,比如异步生成器(https://www.python.org/dev/peps/pep-0525/ ),但是它适用于多数情况,并在你开始时为你提供一个简单的模型。
在下一段代码中,你将看到一个例外,那就是async with语句,它会从你通常需要等待的对象创建一个上下文管理器。虽然语义略有不同,但思想是相同的: 将这个上下文管理器标记为可以交换出去的东西。
我相信你可以想象,在管理事件循环和任务之间的交互时存在一些复杂性。对于刚开始使用asyncio的开发人员来说,这些细节并不重要,但是你需要记住,任何调用await的函数都需要使用async进行标记。否则你会得到一个语法错误。
返回到代码
现在,你已经基本了解了什么是asyncio,我们来浏览一下示例代码的asyncio版本,并找出它是如何工作的。注意,这个版本添加了aiohttp。在运行示例代码之前,你应该先运行pip install aiohttp:
这个版本比前两个版本稍微复杂一些。它具有类似的结构,但是设置任务的工作量要比创建ThreadPoolExecutor多一些。让我们从示例的顶部开始。
download_site ()
除了函数定义行上的async关键字和实际调用session.get()时的async with关键字之外,download_site()的顶部几乎与threading版本相同。稍后你将看到为什么Session可以在这里传递,而不是使用线程本地存储。
download_all_sites()
download_all_sites()与threading示例中的相比变化很大。
你可以在所有任务之间共享会话,因此会话在这里作为上下文管理器创建。这些任务可以共享会话,因为它们都在同一个线程上运行。当会话处于糟糕(bad)状态时,一个任务不可能中断另一个任务。
在该上下文管理器中,它使用asyncio.ensure_future()创建一个任务列表,该列表还负责启动这些任务。所有任务创建之后,这个函数会使用asyncio.gather()保持会话上下文处于活动状态,直到所有任务都完成为止。
threading代码也执行类似的操作,但是细节都是在ThreadPoolExecutor中进行处理。目前还没有AsyncioPoolExecutor类。
然而,这里的细节中隐藏着一个小而重要的变化。还记得我们是怎样讨论要创建线程的数量的吗?在threading示例中,最优线程数是多少并不明显。
asyncio的一个很酷的优点是它的伸缩性比threading好得多。与一个线程相比,创建一个任务需要更少的资源和更少的时间,因此创建和运行更多的任务也会运行良好。这个例子只是为每个要下载的站点创建了一个单独的任务,运行效果非常好。
__main__
最后,asyncio的本质意味着你必须启动事件循环并告诉它运行哪些任务。文件底部的__main__部分包含get_event_loop()和run_until_complete()函数的代码。如果没有其它东西的话,这两个函数在命名那些调用函数方面做得很好。
如果你已经更新到Python 3.7, Python核心开发人员已经为你简化了此语法。你只需要使用asyncio.run()代替拗口的asyncio.get_event_loop().run_until_complete()。
为什么asyncio版本很棒
因为它真的很快! 在我的机器上进行的测试中,这是速度最快的代码版本:
执行时序图看起来与threading示例中的情况非常相似。只是I/O请求都是由同一个线程完成的:.
缺少像ThreadPoolExecutor这样漂亮的包装器使得这段代码比threading示例稍微复杂一些。在这种情况下,你必须做一些额外的工作才能获得更好的性能。
还有一个常见的争论是,必须在适当的位置添加async和await是一个额外的复杂性。在某种程度上,这是正确的。这个争论的另一个方面是,它迫使你考虑何时将给定的任务交换出去,这可以帮助你创建一个更好、更快的设计。
伸缩性问题在这里也很突出。为每个站点使用一个线程运行上面的threading示例明显比使用几个线程运行的慢一些。但是,运行包含数百个任务的asyncio示例根本不会降低速度。
asyncio版本的问题
此时asyncio有两个问题。你需要特殊的异步版本的库来充分利用asycio。如果你只是使用requests下载站点,那么速度会慢得多,因为requests的设计目的不是通知事件循环它被阻塞了。随着时间的推移,这个问题变得微不足道,因为越来越多的库包含了asyncio。
另一个更微妙的问题是,如果其中一个任务不合作,那么协作多任务处理的所有优势都将不存在。代码中的一个小错误可能会导致任务运行超时并长时间占用处理器,使需要运行的其他任务无法运行。如果一个任务没有将控制权交还给事件循环,则事件循环无法中断它。
考虑到这一点,我们来开始讨论一种完全不同的并发性——multiprocessing。
multiprocessing 版本
与前面的方法不同,代码的multiprocessing版本充分利用了你的很酷的新计算机具有的多个CPU。或者,对我来说,就是我那台笨重的旧笔记本电脑。话不多说,我们先从代码开始:
这比asyncio示例要短得多,而且实际上看起来与threading示例非常类似,但是在深入研究代码之前,让我们快速浏览一下multiprocessing为你做了什么。
简单介绍multiprocessing
到目前为止,本文中的所有并发示例都只在计算机中的单个CPU或核心上运行。这样做的原因与当前CPython的设计和全局解释器锁(或者GIL)有关。
本文不会深入探讨GIL(https://realpython.com/python-gil/ )的原理和原因。你现在只需要知道这个示例的同步、threading和asyncio版本都运行在一个CPU上就足够了。
标准库中的multiprocessing旨在打破这种障碍,并使你的代码在多个CPU上运行。在高级层面来看,它通过创建一个新的Python解释器实例并在每个CPU上运行,然后将程序的一部分外包给每个CPU来实现这一点。
你可以想象,在当前的Python解释器中启动一个单独的Python解释器并不像启动一个新的线程那么快。这是一个重量级的操作,会伴随着一些限制和困难,但是对于正确的问题,它可以产生巨大的影响。
multiprocessing 代码
与我们的同步版本相比,这里的代码有一些小的更改。第一个更改位于download_all_sites()中。代替简单地重复调用download_site(),它创建了一个multiprocessing.Pool对象,并让它将download_site映射到可迭代的sites上。这在threading示例中应该很熟悉。
在这里,Pool创建了许多单独的Python解释器进程,并让每个进程在可迭代对象中的某些项上运行指定的函数,在我们的例子中,这个可迭代对象是站点列表。主进程与其他进程之间的通信由multiprocessing模块为你处理。
你需要注意创建Pool的行。首先,它没有指定要在Pool中创建多少个进程,尽管这是一个可选参数。默认情况下,multiprocessing.Pool()将确定你计算机中的CPU数量并自动与之匹配。这通常是最好的答案,在我们的例子中也是如此。
对于这个问题,增加进程的数量并没有使程序变得更快。它实际上降低了速度,因为设置和销毁所有这些进程的成本要大于并行处理I/O请求的好处。
接下来是该调用的initializer=set_global_session部分。请记住,Pool中的每个进程都有自己的内存空间。这意味着它们不能共享诸如Session对象之类的东西。你不希望每次调用函数时都创建一个新Session,而是希望为每个进程都创建一个。
initializer函数参数就是为这种情况而构建的。没有办法将返回值从initializer传递回进程download_site()调用的函数,但是可以初始化一个全局session变量来保存每个进程的单个会话。因为每个进程都有自己的内存空间,所以每个进程的全局内存空间是不同的。
代码的更改基本就是这些。代码的其余部分与你之前看到的非常相似。
为什么 multiprocessing版本很棒
这个示例的multiprocessing版本非常棒,因为它相对容易设置,并且只需要很少的额外代码。它还充分利用了计算机的CPU处理能力。这段代码的执行时序图如下:
multiprocessing 版本的问题
这个版本的示例确实需要一些额外的设置,而且全局session对象很奇怪。你必须花一些时间考虑在每个进程中访问哪些变量。
最后,它明显比本例中的asyncio和threading版本要慢:
这并不奇怪,因为I/O限制问题并不是multiprocessing存在的真正原因。在进入下一节并查看CPU限制的示例时,你将看到更多原因。
如何加速CPU密集型程序
我们来换个话题。到目前为止的所有示例都处理了I/ O密集型问题。现在,你将研究一个CPU密集型问题。正如你所看到的,一个I/ O密集型问题将花费大部分时间来等待外部操作(如网络调用)完成。另一方面,CPU密集型问题很少执行I/O操作,它的总体执行时间是影响它处理所需数据的速度的一个因素。
对于我们的示例,我们将使用一个傻瓜函数来创建一些需要很长时间在CPU上运行的东西。这个函数计算从0到传入值的每个数字的平方和:
如果你传入了一个很大的数字,这可能需要一段时间。记住,这只是代码的占位符,它实际上做了一些有用的事情,并且需要大量的处理时间,比如计算方程的根或对大型数据结构进行排序。
CPU密集型的同步版本
现在我们来看看这个例子的非并发版本:
这段代码调用cpu_bound() 20次,每次调用都使用一个不同的较大的数值。它在一个CPU上的一个进程中的一个线程上完成所有这些。执行时序图如下:
与I/ O密集型的示例不同,CPU密集型的示例在运行时通常相当一致。在我的机器上,这个过程大约需要7.8秒:
显然我们可以做得更好。这都是在没有并发的单个CPU上运行的。我们来看看能不能做得更好。
threading 和 asyncio 版本
你认为使用 threading 或 asyncio重写这段代码会加速多少?
如果你回答“一点也不会加速”,给自己一块饼干。如果你回答“它会让速度慢下来”,那就给自己两块饼干。
原因如下: 在上面的I/ O密集型示例中,大部分执行时间都花在等待缓慢的操作完成上。threading 和 asyncio允许你重叠等待的时间,而不是按顺序执行,从而加快了速度。
然而,对于CPU密集型问题,是没有等待的。CPU正在以最快的速度运行以完成这个问题。在Python中,线程和任务在同一个CPU上的同一个进程中运行。这意味着这一个CPU要执行所有非并发代码的工作,以及设置线程或任务的额外工作。这个过程需要超过10秒:
我已经编写了这段代码的一个threading版本,并将它与其他示例代码放在GitHub 仓库(https://github.com/realpython/materials/tree/master/concurrency-overview)中,这样你就可以自己进行测试了。不过,我们现在先不看这个版本。
CPU密集型的multiprocessing 版本
现在你终于到达了 multiprocessing真正闪耀的地方。与其他并发库不同, multiprocessing被显式地设计为通过跨多个CPU来共享繁重的CPU工作负载。下面是它的执行时序图:
代码如下:
与非并发版本相比,这段代码只需要更改一点点。你必须import multiprocessing,然后只需要更改从“循环遍历数值”到“创建一个multiprocessing.Pool对象,并使用其.map()方法向空闲的工作进程发送单独的编号”的部分。
这正是你为I/O密集型的multiprocessing代码所做的,但是在这里你不需要担心Session对象。
如上所述,你应该注意 multiprocessing.Pool()构造函数的processes可选参数。你可以指定想要在Pool.中创建和管理多少Process对象。默认情况下,它将确定你的计算机中有多少个CPU,并为每个CPU创建一个进程。虽然这对于我们的简单示例非常有用,但是在生产环境中你可能想要有更多的控制权。
另外,正如我们在关于threading的第一节中提到的,multiprocessing.Pool代码是构建在Queue 和 Semaphore等构建块之上的,对于使用其他语言编写多线程和多进程代码的人来说,这些构建块是很熟悉的。
为什multiprocessing版本很棒
这个示例的multiprocessing版本非常棒,因为它相对容易设置,并且只需要很少的额外代码。它还充分利用了计算机的CPU处理能力。
这就是上次我们查看multiprocessing时我所说的。最大的不同在于,这一次它显然是最佳选择。它在我的机器上运行只需要2.5秒:
这比我们看到的其他选项要好得多。
multiprocessing 版本的问题
使用multiprocessing也有一些缺点。在这个简单的例子中,它们并没有真正显示出来,但是将你的问题拆分,以便每个处理器能够独立工作,这有时是很困难的。
此外,许多解决方案都需要流程之间进行更多的通信。这可能会给你的解决方案增加一些复杂性,而非并发程序则不需要处理这些复杂性。
什么时候使用并发
这里已经介绍了很多基础知识,所以我们来回顾一些关键思想,然后再讨论一些决策点,这些决策点将帮助你确定要在项目中使用哪个并发模块(如果可用的话)。
这个过程的第一步是决定你是否应该使用并发模块。虽然这里的示例使每个库看起来都非常简单,但是并发性总是伴随着额外的复杂性,并且常常会导致难以发现的bug。
不要添加并发,直到出现已知的性能问题,然后确定需要哪种类型的并发。正如Donald Knuth所说,“不成熟的优化是编程中所有罪恶的根源(或者至少是大部分罪恶的根源)。”
一旦你决定要优化程序,下一步就是确定你的程序是CPU密集型还是I/O密集型。请记住,I/ O密集型程序是那些将大部分时间花在等待某些事情发生上的程序,而CPU密集型程序则将时间花在尽可能快地处理数据或运算数字上。
正如你所看到的,CPU密集型问题只会从使用 multiprocessing中获益。而使用threading和syncio在解决这类问题上一点帮助也没有。
对于I/ O密集型问题,Python社区中有一条通用的经验法则:“可以使用时使用asyncio,必须使用时使用threading。”asyncio可以为这类程序提供最佳的速度提升,但是有时你需要使用一些没有被移植的关键库来利用asyncio。请记住,任何不将控制权交给事件循环的任务都会阻塞所有其他任务。
结论
现在你已经看过了Python中可用的基本并发类型:
- threading
- asyncio
- multiprocessing
你已经了解了应该使用哪种并发方法来解决给定的问题,或者是否应该使用任何并发方法! 此外,你也对在使用并发时可能出现的一些问题有了很好的理解。
我希望你已经从本文中学到了很多,并且发现了并发性在你的项目中的巨大用处!
英文原文:https://realpython.com/python-concurrency/ 译者:Nothing