Python中的asyncio代码详解
asyncioä»ç»
çæc#çåå¦å¯è½ç¥éï¼å¨c#ä¸å¯ä»¥å¾æ¹ä¾¿çä½¿ç¨ async å await æ¥å®ç°å¼æ¥ç¼ç¨ï¼é£ä¹å¨pythonä¸åºè¯¥æä¹åå¢ï¼å¶å®pythonä¹æ¯æå¼æ¥ç¼ç¨ï¼ä¸è¬ä½¿ç¨ asyncio è¿ä¸ªåºï¼ä¸é¢ä»ç»ä¸ä»ä¹æ¯ asyncio :
asyncio æ¯ç¨æ¥ç¼å 并å 代ç çåºï¼ä½¿ç¨ async/await è¯æ³ã asyncio 被ç¨ä½å¤ä¸ªæä¾é«æ§è½ Python å¼æ¥æ¡æ¶çåºç¡ï¼åæ¬ç½ç»åç½ç«æå¡ï¼æ°æ®åºè¿æ¥åºï¼åå¸å¼ä»»å¡éåççã asyncio å¾å¾æ¯æ建 IO å¯éååé«å±çº§ ç»æå ç½ç»ä»£ç çæä½³éæ©ã
asyncioä¸çåºæ¬æ¦å¿µ
å¯ä»¥çè§ï¼ä½¿ç¨asyncioåºæ们ä¹å¯ä»¥å¨python代ç ä¸ä½¿ç¨ async å await ãå¨ asyncio ä¸ï¼æå个åºæ¬æ¦å¿µï¼åå«æ¯ï¼
Eventloop
Eventloop å¯ä»¥è¯´æ¯ asyncio åºç¨çæ ¸å¿ï¼ä¸å¤®æ»æ§ï¼ Eventloop å®ä¾æä¾äºæ³¨åãåæ¶ãæ§è¡ä»»å¡ååè° çæ¹æ³ã ç®åæ¥è¯´ï¼å°±æ¯æ们å¯ä»¥æä¸äºå¼æ¥å½æ°æ³¨åå°è¿ä¸ªäºä»¶å¾ªç¯ä¸ï¼äºä»¶å¾ªç¯å循ç¯æ§è¡è¿äºå½æ°ï¼æ¯æ¬¡åªè½æ§è¡ä¸ä¸ªï¼ï¼å¦æå½åæ£å¨æ§è¡çå½æ°å¨çå¾I/Oè¿åï¼é£ä¹äºä»¶å¾ªç¯å°±ä¼æåå®çæ§è¡å»æ§è¡å¶ä»å½æ°ãå½æ个å½æ°å®æI/Oåä¼æ¢å¤ï¼çå°ä¸æ¬¡å¾ªç¯å°å®çæ¶åå°±ä¼ç»§ç»æ§è¡ã
Coroutine
åç¨æ¬è´¨å°±æ¯ä¸ä¸ªå½æ°ï¼
import asyncio import time async def a(): print('Suspending a') await asyncio.sleep(3) print('Resuming a') async def b(): print('Suspending b') await asyncio.sleep(1) print('Resuming b') async def main(): start = time.perf_counter() await asyncio.gather(a(), b()) print(f'{main.__name__} Cost: {time.perf_counter() - start}') if __name__ == '__main__': asyncio.run(main())
æ§è¡ä¸è¿°ä»£ç ï¼å¯ä»¥çå°ç±»ä¼¼è¿æ ·çè¾åº:
Suspending a
Suspending b
Resuming b
Resuming a
main Cost: 3.0023356619999997
å³äºåç¨çå·ä½ä»ç»ï¼å¯ä»¥åèæ以åçæç« pythonä¸çåç¨ ä¸è¿ä»¥åçé£ç§åæ³ï¼éè¦ä½¿ç¨è£é¥°å¨ï¼å·²ç»è¿æ¶äºã
Future
Future æ¯è¡¨ç¤ºä¸ä¸ªâæªæ¥â对象ï¼ç±»ä¼¼äº javascript ä¸ç promise ï¼å½å¼æ¥æä½ç»æåä¼ææç»ç»æ设置å°è¿ä¸ª Future 对象ä¸ï¼ Future æ¯å¯¹åç¨çå°è£ã
>>> import asyncio >>> def fun(): ... print("inner fun") ... return 111 ... >>> loop = asyncio.get_event_loop() >>> future = loop.run_in_executor(None, fun) #è¿é没æ使ç¨await inner fun >>> future #å¯ä»¥çå°ï¼funæ¹æ³ç¶ææ¯pending <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/futures.py:348]> >>> future.done() # è¿æ²¡æå®æ False >>> [m for m in dir(future) if not m.startswith('_')] ['add_done_callback', 'cancel', 'cancelled', 'done', 'exception', 'get_loop', 'remove_done_callback', 'result', 'set_exception', 'set_result'] >>> future.result() #è¿ä¸ªæ¶åå¦æç´æ¥è°ç¨result()æ¹æ³ä¼æ¥é Traceback (most recent call last): File "<input>", line 1, in <module> asyncio.base_futures.InvalidStateError: Result is not set. >>> async def runfun(): ... result=await future ... print(result) ... >>>loop.run_until_complete(runfun()) #ä¹å¯ä»¥éè¿ loop.run_until_complete(future) æ¥æ§è¡ï¼è¿éåªæ¯ä¸ºäºæ¼ç¤ºawait 111 >>> future <Future finished result=111> >>> future.done() True >>> future.result() 111 Task
Eventloop é¤äºæ¯æåç¨ï¼è¿æ¯æ注å Future å Task 2ç§ç±»åç对象ï¼è Future æ¯åç¨çå°è£ï¼ Future 对象æä¾äºå¾å¤ä»»å¡æ¹æ³ï¼å¦å®æåçåè°ï¼åæ¶ï¼è®¾ç½®ä»»å¡ç»æççï¼ï¼ä½æ¯ä¸è¬æåµä¸å¼åèä¸éè¦æä½ Future è¿ç§åºå±å¯¹è±¡ï¼èæ¯ç´æ¥ç¨ Future çåç±» Task ååçè°åº¦åç¨æ¥å®ç°å¹¶åãé£ä¹ä»ä¹æ¯ Task å¢ï¼ä¸é¢ä»ç»ä¸ï¼
ä¸ä¸ªä¸ Future 类似ç对象ï¼å¯è¿è¡ Python åç¨ãé线ç¨å®å¨ã Task 对象被ç¨æ¥å¨äºä»¶å¾ªç¯ä¸è¿è¡åç¨ãå¦æä¸ä¸ªåç¨å¨çå¾ä¸ä¸ª Future å¯¹è±¡ï¼ Task 对象ä¼æ起该åç¨çæ§è¡å¹¶çå¾è¯¥ Future 对象å®æãå½è¯¥ Future 对象å®æ被æåçåç¨å°æ¢å¤æ§è¡ã äºä»¶å¾ªç¯ä½¿ç¨ååæ¥ç¨è°åº¦: ä¸ä¸ªäºä»¶å¾ªç¯æ¯æ¬¡è¿è¡ä¸ä¸ª Task 对象ãèä¸ä¸ª Task 对象ä¼çå¾ä¸ä¸ª Future 对象å®æï¼è¯¥äºä»¶å¾ªç¯ä¼è¿è¡å¶ä» Task ãåè°ææ§è¡IOæä½ã
ä¸é¢ççç¨æ³ï¼
>>> async def a(): ... print('Suspending a') ... await asyncio.sleep(3) ... print('Resuming a') ... >>> task = asyncio.ensure_future(a()) >>> loop.run_until_complete(task) Suspending a Resuming a
asyncioä¸ä¸äºå¸¸è§ç¨æ³çåºå«
Asyncio.gatheråasyncio.wait
æ们å¨ä¸é¢ç代ç ä¸ç¨å°è¿ asyncio.gather ï¼å¶å®è¿æå¦å¤ä¸ç§ç¨æ³æ¯ asyncio.wait ï¼ä»ä»¬é½å¯ä»¥è®©å¤ä¸ªåç¨å¹¶åæ§è¡ï¼é£ä¹ä»ä»¬æä»ä¹åºå«å¢ï¼ä¸é¢ä»ç»ä¸ã
>>> import asyncio >>> async def a(): ... print('Suspending a') ... await asyncio.sleep(3) ... print('Resuming a') ... return 'A' ... ... ... async def b(): ... print('Suspending b') ... await asyncio.sleep(1) ... print('Resuming b') ... return 'B' ... >>> async def fun1(): ... return_value_a, return_value_b = await asyncio.gather(a(), b()) ... print(return_value_a,return_value_b) ... >>> asyncio.run(fun1()) Suspending a Suspending b Resuming b Resuming a A B >>> async def fun2(): ... done,pending=await asyncio.wait([a(),b()]) ... print(done) ... print(pending) ... task=list(done)[0] ... print(task) ... print(task.result()) ... >>> asyncio.run(fun2()) Suspending b Suspending a Resuming b Resuming a {<Task finished coro=<a() done, defined at <input>:1> result='A'>, <Task finished coro=<b() done, defined at <input>:8> result='B'>} set() <Task finished coro=<a() done, defined at <input>:1> result='A'> A
æ ¹æ®ä¸è¿°ä»£ç ï¼æ们å¯ä»¥çåºä¸¤èçåºå«ï¼
asyncio.gather è½æ¶éåç¨çç»æï¼èä¸ä¼æç§è¾å¥åç¨ç顺åºä¿å对åºåç¨çæ§è¡ç»æï¼è asyncio.wait çè¿åå¼æ两项ï¼ç¬¬ä¸é¡¹æ¯å®æçä»»å¡å表ï¼ç¬¬äºé¡¹è¡¨ç¤ºçå¾å®æçä»»å¡å表ã
asyncio.wait æ¯ææ¥åä¸ä¸ªåæ° return_when ï¼å¨é»è®¤æåµä¸ï¼ asyncio.wait ä¼çå¾å¨é¨ä»»å¡å®æ (return_when='ALL_COMPLETED') ï¼å®è¿æ¯æ FIRST_COMPLETED ï¼ç¬¬ä¸ä¸ªåç¨å®æå°±è¿åï¼å FIRST_EXCEPTION ï¼åºç°ç¬¬ä¸ä¸ªå¼å¸¸å°±è¿åï¼ï¼
>>> async def fun2(): ... done,pending=await asyncio.wait([a(),b()],return_when=asyncio.tasks.FIRST_COMPLETED) ... print(done) ... print(pending) ... task=list(done)[0] ... print(task) ... print(task.result()) ... >>> asyncio.run(fun2()) Suspending a Suspending b Resuming b {<Task finished coro=<b() done, defined at <input>:8> result='B'>} {<Task pending coro=<a() running at <input>:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10757bf18>()]>>} <Task finished coro=<b() done, defined at <input>:8> result='B'> B
ä¸è¬æåµä¸ï¼ç¨ asyncio.gather 就足å¤äºã
asyncio.create_taskåloop.create_task以åasyncio.ensure_future
è¿ä¸ç§æ¹æ³é½å¯ä»¥å建 Task ,ä»Python3.7å¼å§å¯ä»¥ç»ä¸ç使ç¨æ´é«é¶ç asyncio.create_task .å¶å® asyncio.create_task å°±æ¯ç¨ç loop.create_task . loop.create_task æ¥åçåæ°éè¦æ¯ä¸ä¸ªåç¨ï¼ä½æ¯ asyncio.ensure_future é¤äºæ¥ååç¨ï¼è¿å¯ä»¥æ¯ Future 对象æè awaitable 对象ï¼
- å¦æåæ°æ¯åç¨ï¼å¶åºå±ä½¿ç¨ loop.create_task ï¼è¿å Task 对象
- å¦ææ¯ Future 对象ä¼ç´æ¥è¿å
- å¦ææ¯ä¸ä¸ª awaitable 对象ï¼ä¼ await è¿ä¸ªå¯¹è±¡ç __await__ æ¹æ³ï¼åæ§è¡ä¸æ¬¡ ensure_future ï¼æåè¿å Task æè Future ã
æ以 ensure_future æ¹æ³ä¸»è¦å°±æ¯ç¡®ä¿è¿æ¯ä¸ä¸ª Future 对象ï¼ä¸è¬æåµä¸ç´æ¥ç¨ asyncio.create_task å°±å¯ä»¥äºã
注ååè°åæ§è¡åæ¥ä»£ç
å¯ä»¥ä½¿ç¨ add_done_callback æ¥æ·»å æååè°ï¼
def callback(future): print(f'Result: {future.result()}') def callback2(future, n): print(f'Result: {future.result()}, N: {n}') async def funa(): await asyncio.sleep(1) return "funa" async def main(): task = asyncio.create_task(funa()) task.add_done_callback(callback) await task #è¿æ ·å¯ä»¥ä¸ºcallbackä¼ éåæ° task = asyncio.create_task(funa()) task.add_done_callback(functools.partial(callback2, n=1)) await task if __name__ == '__main__': asyncio.run(main())
æ§è¡åæ¥ä»£ç
å¦ææåæ¥é»è¾ï¼æ³è¦ç¨ asyncio æ¥å®ç°å¹¶åï¼é£ä¹éè¦æä¹åå¢ï¼ä¸é¢ççï¼
def a1(): time.sleep(1) return "A" async def b1(): await asyncio.sleep(1) return "B" async def main(): loop = asyncio.get_running_loop() await asyncio.gather(loop.run_in_executor(None, a1), b1()) if __name__ == '__main__': start = time.perf_counter() asyncio.run(main()) print(f'main method Cost: {time.perf_counter() - start}') # è¾åºï¼ main method Cost: 1.0050589740000002
å¯ä»¥ä½¿ç¨ run_into_executor
æ¥å°åæ¥å½æ°é»è¾è½¬åæä¸ä¸ªåç¨ï¼ç¬¬ä¸ä¸ªåæ°æ¯è¦ä¼ é concurrent.futures.Executor å®ä¾çï¼ä¼ é None ä¼éæ©é»è®¤ç executor ã
æ»ç»
以ä¸æè¿°æ¯å°ç¼ç»å¤§å®¶ä»ç»çPythonä¸çasyncio代ç 详解,å¸æ对大家ææ帮å©ï¼å¦æ大家æä»»ä½çé®è¯·ç»æçè¨ï¼å°ç¼ä¼åæ¶åå¤å¤§å®¶çãå¨æ¤ä¹é常æ谢大家对èæ¬ä¹å®¶ç½ç«çæ¯æï¼
å¦æä½ è§å¾æ¬æå¯¹ä½ æ帮å©ï¼æ¬¢è¿è½¬è½½ï¼ç¦è¯·æ³¨æåºå¤ï¼è°¢è°¢ï¼