Python异步开发
本文参考内容 https://www.bilibili.com/video/BV1dD4y127bD
asyncio的基础是协程
使用Asyncio编程是基于协程的异步开发
- 操作系统的调度单元是进程与线程,而协程是人为创造的
- 用户态上下午切换技术
- 保存当前的执行环境
协程实现方法
协程库,如greenlet
from greenlet import greenlet def func1(): print(1) gr2.switch() # 2. 执行gr2 print(2) gr2.switch() # 4. 执行gr2 def func2(): print(3) gr1.switch() # 3. 执行gr1 print(4) gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch() # 1. 执行gr1 # output: 1 3 2 4 - 可巧用该方式实现复杂逻辑下的goto效果
yield - 生产中不常用
def func1(): yield 1 yield from func2() yield 2 def func2(): yield 3 yield 4 f1 = func1() for i in f1: print(i) # output: 1 3 4 2
asyncio装饰器 py3.4
import asyncio @asyncio.coroutine # 协程函数 def func1(): print(1) yield from asyncio.sleep(2) # IO耗时操作,让进程处理其他任务防止阻塞 print(2) @asyncio.coroutine def fun2(): print(3) yield from asyncio.sleep(2) print(4) # 协程函数的运行 # loop = asyncio.get_event_loop() # loop.run_until_complete( func1() ) tasks = [ asyncio.ensure_future( func1() ), asyncio.ensure_future( func2() ) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) # 按照随机顺序执行任务列表中的任务 # 若先fun1,output 1 3 2 4
说明:当执行到
yield from
后面有asyncio
修饰的内容时,可以切换到其他协程函数。遇到阻塞部分自动切换。如上例中,若一个操作需要2秒,则使用装饰器可并发,从而节省2秒async 、 await 关键字 py3.5 - 装饰器的简化写法
import asyncio async def func1(): # 协程函数 print(1) await asyncio.sleep(2) # 遇到await自动切换 print(2) async def fun2(): print(3) await asyncio.sleep(2) print(4) tasks = [ asyncio.ensure_future( func1() ), asyncio.ensure_future( func2() ) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
协程的意义
统筹规划
普通方式下载
import requests def download(url): file_name = url.rsplit('/')[-1] with open(file_name, 'wb') as f: f.write(response.content) if __name__ == '__main__': url_list = [ 'http://****', 'http://****', 'http://****' ] for i in url_list: download(i)
异步方式下载
import asyncio import aiohttp async def download(session, url): async with session.get(url, verify_ssl=False) as resp: content = await resp.content.read() # await修饰的请求是并发发出的 file_name = url.rsplit('/')[-1] with open(file_name, 'wb') as f: f.write(content) async def run(): async with aiohttp.clientSession() as session: url_list = [ 'http://****', 'http://****', 'http://****' ] tasks = [asyncio.create_task(download(session, url)) for url in url_list] await asyncio.wait(tasks) if __name__ == '__main__': asyncio.run(run())
语法详解
asynio内的事件循环
以下伪代码仅用于理解事件循环内部的运行逻辑
任务列表 = [任务1, 任务2, 任务3...]
# 列表中的任务有多种状态,未执行、正在执行、已执行等等
while True:
可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有任务,返回状态为可执行,已完成的任务
for 就绪任务 in 已准备就绪的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除已完成的任务
如果 任务列表中的任务都完成
break
事件循环的代码表现
import asyncio
# 获取事件循环
loop = asyncio.get_event_loop()
# 构造任务列表
loop.run_until_complete(任务)
# 以上两行的等效简化写法 - py3.7
asyncio.run()
异步编程的流程
协程函数:定义函数时使用async def 函数名
协程对象:执行协程函数时会得到的对象
# 定义协程函数
async def func():
pass
# 获取协程对象 - 获取协程对象(简单执行协程)时,函数内部代码不会执行
result = func()
如果需要运行协程函数内部代码,需与事件循环相配合,让事件循环执行
import asyncio
async def func():
print(1)
loop = asyncio.get_event_loop()
loop.run_until_complete( func() )
# 采用以上两行 或 下面一行
asyncio.run(func())
await关键字
await后接可等待对象。共三种类型:协程对象,future对象,task对象
import asyncio
async def func():
print('start')
# io阻塞 - 此时如果有其他任务,会切换执行其他任务
# 下面的print会等待结果后再执行
res = await asyncio.sleep(2)
print(f'end {res}')
asyncio.run(func())
协程对象
import asyncio
async def parse():
print('start')
res = await asyncio.sleep(2)
print(f'end')
return True
async def run():
print('run')
# 此处await后跟协程对象
# 若有其他任务会执行其他任务,此处只执行run函数,因此无其他任务
# 会等待结果后再往下执行
res = await parse()
print(f'finish, parse res is {res}')
asyncio.run(run())
说明: await关键字后的代码需等待结果返回后才会往下执行,若是多个任务并发会自动切换其他任务。
若下一步需要上一步的执行结果,需用await修饰
Task对象
Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象,Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象 完成,被打包的协程将恢复执行。(https://docs.python.org/zh-cn/3.7/library/asyncio-task.html#asyncio.Task)
在事件循环中,添加多个任务
通过asyncio.create_task(协程对象)
的方式创建Task对象(py3.7)
更低层的实现方式loop.create_task()
或ensure_future()
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return True
async def main():
print('run')
# 创建task对象,创建即将任务添加到事件循环
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(func())
# 此处任务已创建完毕
print('run end')
res1 = await task1 # 执行至此时,先等待结果,发现内有io,切换到其他任务
res2 = await task2
print(res1, res2)
asyncio.run(main())
# 事件循环共3个任务,main + func*2
采用任务列表方式。此时Task列表在协程函数内,执行创建任务前事件循环已创建。
import asyncio
async def func(a):
print(1)
await asyncio.sleep(2)
print(a)
print(2)
return True
async def main():
print('run')
# task1 = asyncio.create_task(func())
# task2 = asyncio.create_task(func())
# 使用任务列表
task_list = [
asyncio.create_task(func(11), name='11'),
asyncio.create_task(func(22), name='22')
]
print('run end')
# res1 = await task1
# res2 = await task2
# print(res1, res2)
# await后只能跟协程对象,task,future,不能跟列表。列表需要构造成目标对象
# 返回事件循环内的两个集合
# done中是任务结果的返回值集合
# 若传入timeout参数 asynio.wait(task_list, timeout=1) 则未完成的任务在pending中
done, pending = await asyncio.wait(task_list, timeout=None)
print(done)
asyncio.run(main())
创建任务不可出现在事件循环前,改用协程对象
import asyncio
async def func(a):
print(1)
await asyncio.sleep(2)
print(a)
print(2)
return True
"""
以下写法
task_list = [
asyncio.create_task(func(11), name='11'),
asyncio.create_task(func(22), name='22')
]
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)
由于create_task会立即将Task加入循环,而此时循环还未创建,会报错
"""
# 改为存放协程对象
task_list = [
func(11),
func(22)
]
# 生成事件循环后将协程对象构造成task对象
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)
asyncio模块 future对象
Task类的父类。用于存放异步完成的结果(运行状态、返回值等),task对象内的结果来源自future。
通常不会使用future对象,以下代码仅用于表明特征
async def main():
# 获取当前正在运行的事件循环
loop = asynio.get_running_loop()
# 创建一个future对象(任务), 不进行操作
f = loop.create_future()
# 等待结果 - 由于没有任何操作,没有结果,会一直等待
await f
asyncio.run(main())
修改运行结果让await
等待到内容
import asyncio
async def set_after(f):
await asyncio.sleep(2)
f.set_result('111')
async def main():
loop = asynio.get_running_loop()
f = loop.create_future()
# 通过代码将future对象赋值
await loop.create_task(set_after(f))
res = await f
print(res)
asyncio.run(main())
concurrent模块 future对象
与asyncio模块的future对象没有关系
使用线程池,进程池实现异步操作用到的对象
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(a):
time.sleep(1)
print(a)
pool = ThreadPoolExecutor(max_workers=5)
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
# 一共5个线程但提交了10个任务
# 此处是线程异步产生的future对象
fut = pool.submit(func, 1)
print(fut)
交叉使用:采用asyncio异步编程时,某些三方模块不支持asyncio
import time
import asyncio
import concurrent.futures
# 普通函数
def func1():
time.sleep(2)
return True
async def main():
loop = asyncio.get_running_loop()
# 该行代码内部有两个主要逻辑
# 此处为普通函数。默认创建线程池,以线程方式异步。得到线程future对象
# asyncio将线程future对象转化成asyncio协程的fut对象(通过asyncio.wrap_future)
fut = loop.run_in_executor(None. func1) # 此处传None将默认使用线程池
result = await fut
print('default thread pool', result)
# 传统写法,上述代码的内部实现类似下方逻辑
# with concurrent.futures.ThreadPoolExecutor() as pool:
# result = await loop.run_in_executor(pool, func1)
# print('custom thread pool', result)
asyncio.run(main())
使用进程的协程与进程交叉使用
import time
import asyncio
import concurrent.futures
# 普通函数
def func1():
time.sleep(2)
return True
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, func1)
print('custom process pool', result)
asyncio.run(main())
实例:asyncio + requests(不支持异步模块)
import asyncio
import requests
asyncio def download(url):
print('download start')
loop = asyncio.get_event_loop()
# 第二个参数为目标函数,后续参数为目标函数参数
future = lool.run_in_executor(None, requests.get, url)
resp = await future
print('download end')
with open(url.rsplit('/')[-1], 'wb') as f:
f.write(resp.content)
if __name__ == '__main__':
url_list = [
'http://****',
'http://****',
'http://****'
]
tasks = [download(url) for url in url_list]
# main函数中创建协程对象,且创建与启动相连。使用低层方式启动循环
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
异步迭代器
迭代器:在内部实现__iter__()
和__next__()
异步迭代器:实现了__aiter__()
与__anext__()
异步可迭代对象:async for
中被使用的对象,__aiter__()
方法返回asynchronous iteratior
import asyncio
class Reader:
def __init__(self):
self.count = 0
async def readline(self): # 该方法在下方的调用处有await修饰,需async声明
# await asyncio.sleep(1)
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self): # 有该方法表明该类为一个迭代器,该方法需返回一个可迭代对象
return self
async def __anext__(self):
val = await self.readline() # 协程对象
if val == None: # 迭代器终止
raise StopAsyncIteration
return val
async def run():
obj = Reader()
async for item in obj: # async for 必须在协程函数内
print(item)
asyncio.run(run())
在声明某个对象时,若该对象操作存在io阻塞,可使用
异步上下文管理器
上下文管理器:with
,内有__enter__()
与__exit__()
方法
异步上下文管理器对象通过定义__aenter__()
与__aexit__()
,对async with
中的环境进行控制
class AsyncContextManager:
def __init__(self):
self.conn = conn
async def do_something(self):
# 异步操作数据库
return True
async def __aenter__(self):
# 异步连接数据库
self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self, exc_tyoe, exc, tb):
# 异步关闭数据库连接
await asyncio.sleep(1)
async def func():
# 实例化这个上下文管理器 - async with 必须嵌套进异步函数内
async with AsyncContextManager() as obj:
result = await f.do_something()
print(result)
asyncio.run(func())
uvloop
asyncio事件循环的替代方案。事件循环效率 大于 默认asyncio的事件循环。(提高性能使用)
pip3 install uvloop
基本使用 - 只需在开始前声明循环方式为uvloop,其余与原先写法不变
import asyncio
import uvloop
# 只需在原有基础上声明事件循环策略使用uvloop
asynico.set_event_loop_policy(uvloop.EventLoopPolicy())
# 原有代码
# 原有运行方式
asyncio.run(...)
注意:异步网络服务(django3、fastapi)asgi(wsgi的异步版本)内部使用了uvicorn
,而uvicorn
内使用了uvloop
实例
redis
python代码连接/读写/断开redis都是用网络IO
pip3 install aioredis
实例1:只有redis操作任务,遇到io加上await
关键字
import ayncio
import aioredis
async def execute(address, password):
print('start: ', address)
# 网络IO操作: 连接
redis = await aioredis.create_redis(address, password=password)
# 网络IO操作: redis新增数据car:{key1: 1, key2: 2, key3: 3}
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 网络IO操作: 去redis中获取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# 网络IO操作: 断开
await redis.wait_closed()
print('end:', address)
asyncio.run( execute('redis://47.93.4.198:6379', 'root12345') )
实例2:多个任务下
import ayncio
import aioredis
async def execute(address, password):
print('start: ', address)
# 网络IO操作: 连接
redis = await aioredis.create_redis(address, password=password)
# 网络IO操作: redis新增数据car:{key1: 1, key2: 2, key3: 3}
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 网络IO操作: 去redis中获取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# 网络IO操作: 断开
await redis.wait_closed()
print('end:', address)
task_list = [
execute('redis://47.93.4.197:6379', 'root12345'),
execute('redis://47.93.4.198:6379', 'root12345')
]
asyncio.run( asyncio.wait(task_list) )
MySQL
pip3 install aiomysql
实例1
import asyncio
import aiomysql
async def execute(host, password):
print('start: ', host)
# 网络IO操场:先连任务1的地址,遇到io自动连任务2的地址
conn = await aoimysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
# 网络IO操场:遇到IO自动切换任务
cur = await conn.cursor()
# 网络IO操场:遇到IO自动切换任务
await cur.execute('SELECT Host,User FROM user')
# 网络IO操场:遇到IO自动切换任务
result = await cur.fetchall()
print(result)
# 网络IO操场:遇到IO自动切换任务
await cur.close()
conn.close()
print('start: ', host)
task_list = [
execute('47.93.41.197', 'root12345'),
execute('47.93.40.197', 'root12345')
]
asyncio.run( asyncio.wait(task_list) )
web框架
Fastapi为例,说明web框架中异步流程
pip3 install fastapi
pip3 install uvicorn # fastapi运行 - asgi(wsgi的异步版本)
实例一:普通接口的写法与flask类似
import asyncio
import uvicorn
from fastapi import FastAPI()
app = FaskAPI()
@app.get('/')
def index():
# 两个人同时访问该接口,排队执行 - 同步方式
return {'data': 'hello world'}
if __name__ == '_main_':
uvicorn.run('test:app', host='127.0.0.1', port=5000, log_level='info')
实例二:异步接口。遇到await关键字自动处理下一个请求
import aioredis
import uvicorn
from aioredis import Redis
from fastapi import FastAPI
app = FastAPI()
# 连接池
REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password='root123', minsize=1, maxsize=10)
@app.get('/red')
async def red():
"""异步接口"""
print('start')
# 遇到io,切换到下一个请求
await asyncio.sleep(3)
conn = await REDIS_POOL.acquire()
redis = Redis(conn)
# 网络IO操作: redis新增数据car:{key1: 1, key2: 2, key3: 3}
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 网络IO操作: redis读取数据
result = await redis.hget('car', encoding='utf-8')
print(result)
# 连接归还
REDIS_POOL.release(conn)
return result
if __name__ == '_main_':
# python文件名test.py内的app对象运行
uvicorn.run('test:app', host='127.0.0.1', port=5000, log_level='info')
爬虫
pip3 install aiohttp
实例
import aiohttp
import asyncio
async def fetch(session.url):
print('req url is ', url)
async with session.get(url, verify_ssl=False) as response:
text = await response.text()
print('resp is ', url, len(text))
return text
async def main():
url_list = [
'https://1',
'https://2',
'https://3',
]
tasks = [asyncop.create_task(fetch(session, url)) for url in url_list]
done, pending = await saynco.wait(tasks)
if __name__ == '__main__':
asyncio.run( main() )
总结
运用更少的资源(协程而非线程或进程)实现并发
通过一个线程利用IO等待事件去做其他事情