【Python】协程
纸上得来终觉浅,绝知此事要躬行。
生成器如何进化成协程
在了解协程之前,我们先回顾一下生成器,看看生成器的原理,下面是一段代码示例:
def simple_coroutine(): # ? print(‘-> coroutine started‘) x = yield # ? print(‘-> coroutine received:‘, x) my_cor = simple_coroutine() print(my_cor) # ? next(my_cor) # ? my_cor.send(40) # ? >>> <generator object simple_coroutine at 0x7ff152727228> >>> -> coroutine started >>> -> coroutine received: 40 >>> Traceback (most recent call last): # ? File "/home/ydongy/face-test/多任务/协程yield.py", line 13, in <module> my_cor.send(40) StopIteration
这是一个简单的生成器,我们分析一下整段代码的过程:
? 生成器函数定义: 定义体中有 yield 关键字。
? yield 在表达式中使用; 如果只是接收数据, 那么产出的值是 None——这个值是隐式指定的, 因为 yield 关键字右边没有表达式。
? 调用函数得到生成器对象
? 首先要调用 next(...) 函数, 因为生成器还没启动, 没在 yield 语句处暂停, 所以一开始无法发送数据。就算发送也只能send(None)
,我们通常把这一步骤称为预激。
? 调用这个方法后, yield 表达式会计算出 40; 现在, 协程会恢复, 一直运行到下一个 yield 表达式, 或者终止。
? 程序运行到末尾, 导致生成器像往常一样抛出 StopIteration异常。
ok,一个简单生成器的执行流程大概就是这个样子,其实它就是协程的基本行为。我们再通过一个例子进一步感受一下生成器进化成协程的过程:
def simple_coroutine(num): print(‘-> coroutine started :num = ‘, num) a = yield num print(‘-> coroutine received :a = ‘, a) b = yield num + a print(‘-> coroutine received :b = ‘, b) my_cor = simple_coroutine(10) print(my_cor) # ? next(my_cor) # ? my_cor.send(40) # ? my_cor.send(50) # ? >>> <generator object simple_coroutine at 0x7fa0dc3cf228> >>> -> coroutine started :num = 10 >>> -> coroutine received :a = 40 >>> -> coroutine received :b = 50 >>> Traceback (most recent call last): # ? File "/home/ydongy/face-test/多任务/协程yield.py", line 30, in <module> my_cor.send(50) StopIteration
代码还是上面那个例子,我们在调用的时候传入了一个参数,以及在函数中多定义了一个yield,我们继续分析一下它的过程:
?函数调用返回一个生成器对象
?调用next(...)方法对协程预激,打印num,此时程序暂定在a = yield num
右侧,等待为a
赋值。
?调用send(40)
,程序接着上次运行的位置,也就是为a
赋值,完成之后继续向下运行打印a
,之后程序再次暂定在b = yield num+a
的右侧,等待为b
赋值
?接着继续调用send(50)
,程序接着上次位置运行,把num+a
的值赋给b
,然后打印b
?此时的程序运行到结尾,同样抛出一个StopIteration
的异常
这次分析这个过程我们发现关键的一点是, 协程在 yield 关键字所在的位置暂停执行,怎么个暂定法?其实就是在赋值语句的右侧暂定,等待再次激活协程时才会设定值,把值赋给左侧的变量,继续往下执行,知道再次遇到一个yield(依然暂定在右侧)或者程序结束抛出异常。
预激协程装饰器
我们发现,如果不预激,就无法将函数运行到yield
关键值位置的右侧,那么协程基本就没啥用,也就是在调用send(...)
之前一定要执行next(...)
,或者执行send(None)
,这个None
的参数是必须的,不可以为其他的值,否则会抛出异常。
为了简化协程的用法, 有时会使用一个预激装饰器。我们仍然使用上面的例子,实现一个装饰器:
from functools import wraps def coroutine(func): @wraps(func) def primer(*args, **kwargs): gen = func(*args, **kwargs) next(gen) return gen return primer @coroutine def simple_coroutine(num): ......
这个装饰器实现的原理比较简单,就是在闭包内部函数执行被装饰器的函数之后提前调用一次next(...)
,然后再把返回生成器
终止协程和异常处理
当我们的协程中发生未处理的异常,会导致我们协程终止,在继续send(...)
的时候,由于没有处理异常,如果试图重新激活协程, 会抛出
StopIteration 异常。
示例:
In [1]: from functools import wraps ...: ...: ...: def coroutine(func): ...: @wraps(func) ...: def primer(*args, **kwargs): ...: gen = func(*args, **kwargs) ...: next(gen) ...: return gen ...: ...: return primer ...: ...: ...: @coroutine ...: def simple_coroutine(num): ...: print(‘-> coroutine started :num = ‘, num) ...: a = yield num ...: print(‘-> coroutine received :a = ‘, a) ...: b = yield num + a ...: print(‘-> coroutine received :b = ‘, b) ...: In [2]: cor = simple_coroutine(10) # 第一次执行 -> coroutine started :num = 10 In [3]: cor.send("10") # 第二次执行,send(),抛出异常 -> coroutine received :a = 10 --------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-3-af6b7415b774> in <module>() ----> 1 cor.send("10") <ipython-input-1-a2f3daf9f619> in simple_coroutine(num) 17 a = yield num 18 print(‘-> coroutine received :a = ‘, a) ---> 19 b = yield num + a 20 print(‘-> coroutine received :b = ‘, b) TypeError: unsupported operand type(s) for +: ‘int‘ and ‘str‘ # ======抛出异常====== In [4]: cor.send(20) # 第三次执行 send(),由于上一次执行抛出异常,直接导致协程的终止。 --------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-4-92d073bdcb96> in <module>() ----> 1 cor.send(20) StopIteration: In [5]:
上面代码是通过Ipython3
的形式运行,可以很清楚的看到,我们把一个字符串传递给变量进行加法运算,直接导致协程的终止。
其实这也是暗示了终止协程的一种方式: 发送某个值, 让协程退出。
Python官方也给了特定的处理方法:
generator.throw(type[, value[, traceback]])
在生成器暂停的位置引发 type 类型的异常,并返回该生成器函数所产生的下一个值。 如果生成器没有产生下一个值就退出,则将引发 StopIteration 异常。 如果生成器函数没有捕获传入的异常,或引发了另一个异常,则该异常会被传播给调用者。
这句话什么意思?我们通过代码来梳理一下:
@coroutine def simple_coroutine(num): print(‘-> coroutine started :num = ‘, num) try: a = yield num except Exception as e: pass else: print(‘-> coroutine received :a = ‘, a) b = yield 30 print(‘-> coroutine received :b = ‘, b) yield cor = simple_coroutine(10) print(cor.throw(Exception)) cor.send(20) >>> -> coroutine started :num = 10 >>> 30 >>> -> coroutine received :b = 20
当我们生成器抛出一个异常,在代码中我们捕获了这个异常,因此会返回下一个yield
的值,如果不存在下一个yield
的值,则将引发 StopIteration
异常,如果我们抛出了异常,但是没有在代码中捕获,则这个异常则会向上冒泡到调用者,导致异常之后的代码也会无法运行。
generator.close()
在生成器函数暂停的位置引发 GeneratorExit。 如果之后生成器函数正常退出、关闭或引发 GeneratorExit (由于未捕获该异常) 则关闭并返回其调用者。 如果生成器产生了一个值,关闭会引发 RuntimeError。 如果生成器引发任何其他异常,它会被传播给调用者。 如果生成器已经由于异常或正常退出则 close() 不会做任何事。
我们仍然通过代码来理解一下:
- 第一句:生成器函数暂停的位置引发 GeneratorExit,如果之后生成器函数正常退出、关闭或引发 GeneratorExit (由于未捕获该异常) 则关闭并返回其调用者。
@coroutine def simple_coroutine(num): try: print(‘-> coroutine started :num = ‘, num) a = yield num print(‘-> coroutine received :a = ‘, a) except GeneratorExit as e: pass cor = simple_coroutine(10) cor.close() cor.send(20) >>> -> coroutine started :num = 10 >>> Traceback (most recent call last): File "/home/ydongy/face-test/多任务/协程yield.py", line 77, in <module> cor.send(20) StopIteration
我们发现在我们捕获GeneratorExit
异常(它是由close()
自动抛出的)之后,没有继续yield
,代码直接结束,抛出的异常是StopIteration
,很正常因为生成器已经结束了,我们调用了send(20)
- 第二句:如果生成器产生了一个值,关闭会引发 RuntimeError。
@coroutine def simple_coroutine(num): try: print(‘-> coroutine started :num = ‘, num) a = yield num print(‘-> coroutine received :a = ‘, a) except GeneratorExit as e: pass yield num # 继续调用了yield cor = simple_coroutine(10) cor.close() cor.send(20) >>> -> coroutine started :num = 10 Traceback (most recent call last): File "/home/ydongy/face-test/多任务/协程yield.py", line 77, in <module> cor.close() RuntimeError: generator ignored GeneratorExit
这一次我们发现,抛出的异常在cor.close()
处,并没有在之后的send(...)
,就是因为我们在捕获异常之后又调用了yield,而且这个异常会向上冒泡,传播到我们调用方,也就是close()
之后的代码也不会运行了。
第三句:如果生成器引发任何其他异常,它会被传播给调用者。 如果生成器已经由于异常或正常退出则 close() 不会做任何事。
@coroutine def simple_coroutine(num): print(‘-> coroutine started :num = ‘, num) a = yield num print(‘-> coroutine received :a = ‘, a) yield num cor = simple_coroutine(10) cor.close() print("=======end======") # 正常退出 >>> -> coroutine started :num = 10 >>> =======end======
这次我们没有捕获close()
的异常,程序本身也没有异常(例如:我们把a = yield num
改成a = yield num + "10"
,就会抛出一个unsupported operand type(s) for +: ‘int‘ and ‘str‘
的异常,因为我们传入的值是int
,数字和字符串不能相加,就会已这个异常退出),最终程序正常退出。
让协程返回值
为了让协程返回值,就必须让协程正常终止,通过一个代码案例来分析一下:
def simple_coroutine(): count = 0 while True: term = yield if term is None: break count += 1 return count cor = simple_coroutine() print(next(cor)) print(cor.send(10)) print(cor.send(None))
这个程序第一次调用,返回一个生成器对象,然后通过nex(...)
预激,程序暂定yield
,我们通过send()
把值传递给term
同时激活协程,开始while
循环再次执行到yield
,继续暂停,等待我们再次调用send()
,只有我们send(None)
时判断不满足条件,循环结束,协程也就结束,返回结果。 一如既往, 生成器对象会抛出StopIteration
异常。 异常对象的 value 属性保存着返回的值。
注意, return 表达式的值会偷偷传给调用方, 赋值给 StopIteration 异常的一个属性。 这样做有点不合常理, 但是能保留生成器对象的常规行为——耗尽时抛出StopIteration 异常。
不过我们可以通过捕获的方式获取返回的值:
try: print(cor.send(None)) except StopIteration as e: result = e.value print(result)
为了解决这个问题PEP380定义中,yield from
结构会在内部自动捕获 StopIteration
异常。 这种处理方式与 for 循环处理 StopIteration 异常的方式一样: 循环机制使用用户易于理解的方式处理异常。 对 yield from
结构来说, 解释器不仅会捕获 StopIteration
异常, 还会把 value
属性的值变成 yield from
表达式的值。
使用yield from
首先通过一个小案例看一下yield from
的用法以及和yield
的不同:
# 传统yield用法 def gen(): for i in range(10): yield i print(list(gen())) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] # yield from 用法 def gen(): yield from range(10) print(list(gen())) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
我们发现yield
是接受一个个元素返回,而yield from
可以直接传入一个可迭代对象,从这个可迭代对象中把元素返回
- 执行原理
yield from x
表达式对 x 对象所做的第一件事是, 调用 iter(x), 从中获取迭代器。 因此, x 可以是任何可迭代的对象。
当然yield from
的作用远不仅仅是用来for
循环,yield from 的主要功能是打开双向通道, 把最外层的调用方与最内层的子生成器连接起来, 这样二者可以直接发送和产出值, 还可以直接传入异常, 而不用在位于中间的协程中添加大量处理异常的代码。
若想使用 yield from 结构, 就要大幅改动代码。 为了说明需要改动的部分, PEP 380 使用了一些专门的术语。
- 委派生成器 :包含
yield from <iterable>
表达式的生成器函数。 - 子生成器 :从
yield from 表达式中 <iterable>
部分获取的生成器。 这就是 PEP 380 的标题中所说的“子生成器”( subgenerator) 。 - 调用方 :PEP 380 使用“调用方”这个术语指代调用委派生成器的客户端代码。
结构如下图:
最后通过代码来简单实现一下这个过程:
# 子生成器 def sub_gen(): total = 0 while True: term = yield if term is None: break total += term return total # 委派生成器 def grouper(results, key): while True: results[key] = yield from sub_gen() # 调用方 def main(data): results = {} for key, values in data.items(): group = grouper(results, key) next(group) # 预激委派生成器 for value in values: group.send(value) # 通过委派生成器把值传递给子生成器的term group.send(None) # 通过委派生成器传递None给子生成器term,结束循环 return results if __name__ == ‘__main__‘: data = { "a": [10, 20, 30, 40], "b": [10, 20, 30, 40], "c": [10, 20, 30, 40], } ret = main(data) print(ret) >>> {‘a‘: 100, ‘b‘: 100, ‘c‘: 100}
先说说这段代码的主要干了啥,就是传递一个data,然后返回字典中每个键对应值列表元素的和。下面就来说说整个代码的执行过程:
- 外层 for 循环每次迭代会新建一个 grouper 实例,赋值给 group 变量; group 是委派生成器。
- 调用 next(group), 预激委派生成器 grouper,此时进入 while True 循环,调用子生成器 sub_gen 后, 在 yield from 表达式处暂停。
- 内层 for 循环调用 group.send(value), 直接把值传给子生成器 sub_gen。同时,当前的 grouper 实例( group) 在 yield from 表达式处暂停。
- 内层循环结束后, group 实例依旧在 yield from 表达式处暂停,因此,grouper函数定义体中为 results[key] 赋值的语句还没有执行。
- 如果外层 for 循环的末尾没有 group.send(None),那么 sub_gen 子生成器永远不会终止, 委派生成器 group 永远不会再次激活, 因此永远不会为 results[key]赋值。
- 外层 for 循环重新迭代时会新建一个 grouper 实例,然后绑定到 group 变量上。 前一个 grouper 实例( 以及它创建的尚未终止的 sub_gen 子生成器实例) 被垃圾回收程序回收。
相关参考:
流畅的Python
https://docs.python.org/zh-cn/3/reference/expressions.html#generator-iterator-methods
https://www.python.org/dev/peps/pep-0380/