Python协程&asyncio&异步编程
Python协程&asyncio&异步编程
1.协程
协程是微线程,是一种用户态上下文切换技术,通过一个线程实现代码块相互切换执行
实现协程有这么几种方法:
- greenlet,早期的模块
- yield 关键字
- asyncio python3.4引入的
- async、await关键字 python3.5 主流[推荐]
1.1 greenlet实现协程
pip install greenlet
# -*- coding: utf-8 -*- from greenlet import greenlet def func1(): print(1) # 第1步:输出1 gr2.switch() # 第2步:跳到func2函数 print(2) # 第5步:输出2 gr2.switch() # 第6步:跳到func2函数 def func2(): print(3) # 第3步:输出3 gr1.switch() # 第4步:跳到func1函数 print(4) # 第7步:输出4 gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch() # 第1步:去执行func1函数
1.2 yield关键字
# -*- coding: utf-8 -*- def func1(): yield 1 yield from func2() yield 2 def func2(): yield 3 yield 4 f1 = func1() for item in f1: print(item)
1.3 asyncio
Python3.4以及之后
# -*- coding: utf-8 -*- import asyncio @asyncio.coroutine def func1(): print(1) yield from asyncio.sleep(2) # 遇到IO耗时操作时,自动切换到tasks中的其它任务 print(2) @asyncio.coroutine def func2(): print(3) yield from asyncio.sleep(2) # 遇到IO耗时操作时,自动切换到tasks中的其它任务 print(4) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()), ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
注意:遇到IO阻塞自动切换
1.4 async和await关键字
Python3.5及以后
# -*- coding: utf-8 -*- import asyncio async def func1(): print(1) await asyncio.sleep(2) # 遇到IO耗时操作时,自动切换到tasks中的其它任务 print(2) async def func2(): print(3) await asyncio.sleep(2) # 遇到IO耗时操作时,自动切换到tasks中的其它任务 print(4) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()), ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
1.5 一般用greenlet或者async&await关键字
2.协程的意义
在一个线程中遇到IO等待时间,不让线程一直白白等待,而是让线程利用空闲时间去点其它的事
# -*- coding: utf-8 -*- import asyncio import aiohttp async def fetch(session, url): print("发送请求") async with session.get(url, verify_ssl=False) as response: content = await response.content.read() file_name = url.rsplit("/")[-1] with open(file_name, mode="wb") as file_object: file_object.write(content) print("下载完成", url) async def main(): async with aiohttp.ClientSession() as session: url_list = [ "http://ww1.sinaimg.cn/mw600/00745YaMgy1gedxxa59lyj30kk10p77m.jpg", "http://ww1.sinaimg.cn/mw600/00745YaMgy1gedxrlhlhaj30kk0dpmxj.jpg", "http://ww1.sinaimg.cn/mw600/00745YaMgy1gedxrlrw4tj30kk0pp78u.jpg" ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] await asyncio.wait(tasks) if __name__ == "__main__": asyncio.run(main())
3.异步编程
3.1 事件循环
理解成为一个死循环
""" # 伪代码 任务列表 = [任务1, 任务2, 任务3,...] while True: 可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有任务,将‘可执行‘和‘已完成‘的任务返回 for 就绪任务 in 可执行的任务列表: 执行就绪任务 for 已完成任务 in 已完成的任务列表: 在任务列表中 移除已完成任务 任务列表中的任务都已完成 则终止循环 """
import asyncio # 去生成或获取一个事件循环 loop = asyncio.get_event_loop() # 把任务放到任务列表 loop.run_until_complete(asyncio.wait(tasks))
3.2 快速上手
协程函数 定义函数的时候 async def 函数名
协程对象 执行协程函数()得到协程对象
async def func(): pass result = func()
注意:运行协程函数得到协程对象,函数内部代码不会执行
要运行协程函数内部代码,必须要将协程对象交给事件循环来处理
import asyncio async def func(): print("Hello World") result = func() # 去生成或获取一个事件循环 loop = asyncio.get_event_loop() # 把任务放到任务列表 loop.run_until_complete(result) # asyncio.run(result) Python3.7之后,这句可以代替上面两句
3.3 await
await + 可等待对象(协程对象、Future、Task对象、-->IO等待)
示例1:
import asyncio async def func(): print("start") response = await asyncio.sleep(2) print("end", response) asyncio.run(func)
示例2:
# -*- coding: utf-8 -*- import asyncio async def others(): print("start") await asyncio.sleep(2) print("end") return "返回值" async def func(): print("开始执行func") response = await others() print("IO请求结束,结果为: ", response) asyncio.run(func())
示例3:
# -*- coding: utf-8 -*- import asyncio async def others(): print("start") await asyncio.sleep(2) print("end") return "返回值" async def func(): print("开始执行func") response1 = await others() print("IO请求结束,结果为: ", response1) response2 = await others() print("IO请求结束,结果为: ", response2) asyncio.run(func())
await就是等待对象的值得到对应的结果之后再继续往下走
3.4 Task对象
在事件循环中添加多个任务
Tasks用语并发调度协程,通过asyncio.create_task(协程对象),这样可以让协程加入事件循环中等待被调度执行。除了使用asyncio.create_task(协程对象)还可以使用较低级的loop.create_task()或ensure_future()函数。不建议手动实例化Task对象。
注意:asyncio.create_task(协程对象)是在Python3.7时加入的。在Python3.7前可以使用较低级的asyncio.ensure_future()函数。
示例1:
import asyncio async def func(): print("1") await asyncio.sleep(2) print("2") return "返回值" async def main(): print("开始执行main") # 创建Task对象,将当前执行func函数任务添加到事件循环 task1 = asyncio.create_task(func()) task2 = asyncio.create_task(func()) print("main结束") # 当执行协程遇到IO操作时,会自动化切换到其它任务(task2) # 此处的await是等待相对应的协程全部执行完毕并获取结果 ret1 = await task1 ret2 = await task2 print(ret1, ret2) asyncio.run(main())
示例2:
# -*- coding: utf-8 -*- import asyncio async def func(): print("1") await asyncio.sleep(2) print("2") return "返回值" async def main(): print("开始执行main") # 创建Task对象,将当前执行func函数任务添加到事件循环 task_list = [ asyncio.create_task(func(), name="n1"), asyncio.create_task(func(), name="n2") ] print("main结束") # 当执行协程遇到IO操作时,会自动化切换到其它任务(task2) # 此处的await是等待相对应的协程全部执行完毕并获取结果 # done是集合,是上面两个任务的返回值 done, pending = await asyncio.wait(task_list, timeout=None) print(done) asyncio.run(main())
示例3:
import asyncio async def func(): print("1") await asyncio.sleep(2) print("2") return "返回值" task_list = [ func(), func() ] done, pending = asyncio.run(asyncio.wait(task_list)) print(done)
3.5 asyncio.Future对象
Task继承Future,Task对象内部的await结果处理基于Future对象来着的
示例1:
import asyncio async def main(): # 获取当前事件循环 loop = asyncio.get_running_loop() # 创建一个future对象,这个对象什么都不干 fut = asyncio.create_future() # 等待任务最终结果(Future对象),没有结果则会一直等下去 await fut asyncio.run(main())
示例2:
import asyncio async def set_after(fut): await asyncio.sleep(2) fut.set_result("success") async def main(): # 获取当前事件循环 loop = asyncio.get_running_loop() # 创建一个future对象,这个对象什么都不干,没绑定任何行为,则这个任务不知道什么时候结束 fut = asyncio.create_future() # 创建一个任务(Task对象),绑定了set_after函数,函数在2s之后,会给fut赋值 # 即手动给fut设置结果,那么fut就可以结束了 await loop.create_task(set_after(fut)) # 等待任务最终结果(Future对象),没有结果则会一直等下去 data = await fut print(data) asyncio.run(main())
3.6 concurrent.future.Future对象
使用线程池或进程池实现异步操作时用到的对象
交叉使用,协程异步编程 + MySQL(不支持)[线程、进程做异步编程]
import time import asyncio import concurrent.futures def func(): # 某个耗时操作 time.sleep(2) return "success" async def main(): loop = asyncio.get_running_loop() # 1.Run in the default loop‘s excutor (默认ThreadPoolExecutor) # 第一步:内部会先调用ThreadPoolExecutor的submit方法去线程池中申请一个线程去执行func函数,并返回一个concurrent.futures.Future对象 # 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为调用asyncio.Future对象 # 因为concurrent.futures.Future对象不支持await语法,需要包装为asyncio.Future对象才能使用 fut = loop.run_in_excutor(None, func) result = await fut print("default thread pool", result) # 2.Run in a custom thread pool with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_excutor(pool, func) print("custom thread pool", result) # 3.Run in a custom process pool with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_excutor(pool, func) print("custom process pool", result) asyncio.run(main())
案例:asyncio + 不支持的模块
import asyncio import requests async def download_image(url): # f发送网络请求下载图片,(遇到下载图片的网络IO请求,自动切换到其它任务) print("开始下载", url) loop = asyncio.get_running_loop() # requests模块不支持异步操作,所以就使用线程池来配合实现了 future = loop.run_in_executor(None, requests.get, url) response = await future print("下载完成") # 图片保存到本地文件 file_name = url.rsplit("/")[-1] with open(file_name, mode="wb") as file_object: file_object.write(response.content) def main(): url_list = [ "http://ww1.sinaimg.cn/mw600/00745YaMgy1gedxxa59lyj30kk10p77m.jpg", "http://ww1.sinaimg.cn/mw600/00745YaMgy1gedxrlhlhaj30kk0dpmxj.jpg", "http://ww1.sinaimg.cn/mw600/00745YaMgy1gedxrlrw4tj30kk0pp78u.jpg" ] tasks = [download_image(url) for url in url_list] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) if __name__ == "__main__": main()
3.7 异步迭代器
什么是异步迭代器
实现了__aiter__()和__anext__()方法的对象。anext__必须返回一个awaitable对象,async for 会处理异步迭代器的__anext()方法所返回的可等待对象,知道其引发一个StopAsyncIteration异常。
什么是异步可迭代对象
可在async for 中使用的对象。必须通过它的__aiter__()返回一个asynchronous iterator
import asyncio class Reader(object): """自定义异步迭代器(同时也是异步可迭代对象)""" def __init__(self): self.count = 0 async def readline(self): # await asyncio.sleep(2) self.count += 1 if self.count == 100: return None return self.count def __aiter__(self): return self async def __anext__(self): val = await self.readline() if val == None: raise StopAsyncIteration return val async def func(): obj = Reader() # async for只能写在协程函数内 async for item in obj: print(item) asyncio.run(func())
3.8 异步上下文管理器
此种对象通过定义__aenter__()方法和__aexit__()方法来对async with语句中的环境进行控制
import asyncio class AsyncContexManager(object): def __init__(self, ): self.conn = conn async def do_something(self): # 异步操作数据库 return 666 async def __aenter__(self): # 异步连接数据库 self.conn = await asyncio.sleep(2) return self async def __anext__(self): # 异步关闭数据库 await asyncio.sleep(2) async def func(): # async with 只能在协程函数中使用 async with AsyncContexManager() as f: result = await f.do_something() print(result) asyncio.run(func())
4.uvloop
是asyncio的事件循环的替代方案。事件循环 > asyncio的事件循环,性能比肩go
pip install uvloop
import asyncio import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy) # 编写asyncio的代码,与之前写的代码一致 # 内部的事件循环会自动变化为uvloop asyncio.run(...)
注意:一个asgi-->uvicorn 内部使用的是uvloop
5.实战案例
5.1 异步操作redis
在使用Python代码操作redis,连接、操作、断开都是网络IO
pip install aioredis
示例1:
import asyncio import aioredis async def execute(address, password): print("开始执行", address) # 网络IO 创建redis连接 redis = await aioredis.create_redis(address, password=password) # 网络IO 在redis中设置哈希值car,内部设置3个键值对 redis = {"car": {"key1": 1, "key2": 2, "key3": 3}} await redis.hmset_dict("car", key1=1, key2=2, key3=3) # 网络IO 去redis中获取值 result = redis.hgetall("car", encoding="utf-8") print(result) redis.close() # 网络IO关闭redis连接 await redis.wait_close() print("结束", address) asyncio.run(execute("redis://127.0.0.1:6379", "123456"))
示例2:
import asyncio import aioredis async def execute(address, password): print("开始执行", address) # 网络IO 先去连接47.93.4.197:6379,遇到IO则自动切换任务,去连接47.93.4.198:6379 redis = await aioredis.create_redis(address, password=password) # 网络IO 遇到IO会自动切换任务 await redis.hmset_dict("car", key1=1, key2=2, key3=3) # 网络IO 遇到IO会自动切换任务 result = redis.hgetall("car", encoding="utf-8") print(result) redis.close() # 网络IO 遇到IO会自动切换任务 await redis.wait_close() print("结束", address) task_list = [ execute("47.93.4.197:6379", "123456"), execute("47.93.4.198:6379", "123456"), ] asyncio.run(asyncio.wait(task_list))
5.1 异步操作MySQL
pip install aiomysql
示例1:
import asyncio import aiomysql async def execute(): print("开始执行") # 网络IO 连接MySQL conn = await aiomysql.connect(host="127.0.0.1", port=3306, user="root", password="123456", db="mysql") # 网络IO 创建cursor cur = await conn.cursor() # 网络IO 执行sql await cur.execute("seletc name from user") # 网络IO 获取sql结果 result = await cur.fetchall() print(result) # 网络IO 关闭连接 await cur.close() conn.close() asyncio.run(execute())
示例2:
import asyncio import aiomysql async def execute(host, password): print("开始执行", host) # 网络IO 连接MySQL先去连接47.93.41.197,遇到IO则切换去连接47.93.41.198 conn = await aiomysql.connect(host=host, port=3306, user="root", password=password, db="mysql") # 网络IO 遇到IO会自动切换任务 cur = await conn.cursor() # 网络IO 遇到IO会自动切换任务 await cur.execute("seletc name from user") # 网络IO 遇到IO会自动切换任务 result = await cur.fetchall() print(result) # 网络IO 遇到IO会自动切换任务 await cur.close() conn.close() print("结束", host) task_list = [ execute("47.93.41.197", "123456"), execute("47.93.41.198", "123456") ] asyncio.run(asyncio.wait(task_list))
5.3 FastAPI框架
安装
pip install fastapi
pip install uvicorn # (asgi 可以认为是支持异步的wsgi,内部基于uvloop)
示例:mu.py
import asyncio import aioredis import uvicorn from fastapi import FastAPI from aioredis import Redis app = FastAPI() REDIS_POOL = aioredis.ConnectionsPool("redis://47.193.14.198:6379", password="123", minsize=1, maxsize=10) @app.get("/") def index(): """普通操作接口""" return {"msg": "hello world"} @app.get("/red") async def red(): # 异步操作接口 print("请求来了") await asyncio.sleep(3) # 连接池获取一个连接 conn = await REDIS_POOL.acquire() redis = Redis(conn) # 设置值 await redis.hmset_dict("car", key1=1, key2=2, key3=3) # 读取值 result = await redis.hgetall("car", encoding="utf-8") print(result) # 连接归还连接池 REDIS_POOL.release(conn) return result if __name__ == "__main__": uvicorn.run("mu:app", host="127.0.0.1", port=5000, log_level="info")
5.4爬虫
pip install aiohttp
import asyncio import aiohttp async def fetch(session, url): print("发送请求", url) async with session.get(url, verify_ssl=False) as response: text = await response.text() print("结果:", url, len(text)) return text async def main(): async with aiohttp.ClientSession() as session: url_list = [ "https://python.org", "https://www.baidu.com", "https://tianbaoo.github.io" ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] done, pending = await asyncio.wait(tasks) print(done) if __name__ == "__main__": asyncio.run(main())
6.总结
最大的意义:通过一个线程利用其IO等待时间去做其他事情。