Python异步开发

2024-01-22

本文参考内容 https://www.bilibili.com/video/BV1dD4y127bDopen in new window

asyncio的基础是协程

使用Asyncio编程是基于协程的异步开发

  • 操作系统的调度单元是进程与线程,而协程是人为创造的
  • 用户态上下午切换技术
  • 保存当前的执行环境

协程实现方法

  • 协程库,如greenlet

    from greenlet import greenlet
    
    def func1():
        print(1)
        gr2.switch()  # 2. 执行gr2
        print(2)
        gr2.switch()  # 4. 执行gr2
    
    def func2():
        print(3)
        gr1.switch()  # 3. 执行gr1
        print(4)
    
      
    gr1 = greenlet(func1)
    gr2 = greenlet(func2)
    
    gr1.switch()  # 1. 执行gr1
    # output: 1 3 2 4 - 可巧用该方式实现复杂逻辑下的goto效果
    
  • yield - 生产中不常用

    def func1():
        yield 1
        yield from func2()
        yield 2
    
    def func2():
        yield 3
        yield 4
    
    f1 = func1()
    for i in f1:
        print(i)
    
    # output: 1 3 4 2
    
  • asyncio装饰器 py3.4

    import asyncio
    
    @asyncio.coroutine  # 协程函数
    def func1():
        print(1)
        yield from asyncio.sleep(2)  # IO耗时操作,让进程处理其他任务防止阻塞
        print(2)
    
    @asyncio.coroutine
    def fun2():
        print(3)
        yield from asyncio.sleep(2)
        print(4)
    
    # 协程函数的运行
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete( func1() )
    
    tasks = [
        asyncio.ensure_future( func1() ),
        asyncio.ensure_future( func2() )
    ]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    # 按照随机顺序执行任务列表中的任务
    # 若先fun1,output 1 3 2 4
    

    说明:当执行到yield from后面有asyncio修饰的内容时,可以切换到其他协程函数。遇到阻塞部分自动切换。如上例中,若一个操作需要2秒,则使用装饰器可并发,从而节省2秒

  • async 、 await 关键字 py3.5 - 装饰器的简化写法

    import asyncio
    
    async def func1():  # 协程函数
        print(1)
        await asyncio.sleep(2)  # 遇到await自动切换
        print(2)
    
    async def fun2():
        print(3)
        await asyncio.sleep(2)
        print(4)
    
    tasks = [
        asyncio.ensure_future( func1() ),
        asyncio.ensure_future( func2() )
    ]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    

协程的意义

统筹规划

  • 普通方式下载

    import requests
    
    def download(url):
        file_name = url.rsplit('/')[-1]
        with open(file_name, 'wb') as f:
        f.write(response.content)
    
    if __name__ == '__main__':
        url_list = [
            'http://****',
            'http://****',
            'http://****'
        ]
        for i in url_list:
            download(i)
    
  • 异步方式下载

    import asyncio
    import aiohttp
    
    
    async def download(session, url)async with session.get(url, verify_ssl=False) as resp:
            content = await resp.content.read()  # await修饰的请求是并发发出的
            file_name = url.rsplit('/')[-1]
            with open(file_name, 'wb') as f:
                f.write(content)
    
    async def run():
        async with aiohttp.clientSession() as session:
            url_list = [
                'http://****',
                'http://****',
                'http://****'
            ]
            tasks = [asyncio.create_task(download(session, url)) for url in url_list]
            await asyncio.wait(tasks)
    
    if __name__ == '__main__':
        asyncio.run(run())
    

语法详解

asynio内的事件循环

以下伪代码仅用于理解事件循环内部的运行逻辑

任务列表 = [任务1, 任务2, 任务3...]
# 列表中的任务有多种状态,未执行、正在执行、已执行等等

while True:
	可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有任务,返回状态为可执行,已完成的任务
	
	for 就绪任务 in 已准备就绪的任务列表:
		执行已就绪的任务
	
	for 已完成的任务 in 已完成的任务列表:
		在任务列表中移除已完成的任务

	如果 任务列表中的任务都完成
		break

事件循环的代码表现

import asyncio

# 获取事件循环
loop = asyncio.get_event_loop()
# 构造任务列表
loop.run_until_complete(任务)


# 以上两行的等效简化写法 - py3.7
asyncio.run()

异步编程的流程

协程函数:定义函数时使用async def 函数名

协程对象:执行协程函数时会得到的对象

# 定义协程函数
async def func():
    pass

# 获取协程对象 - 获取协程对象(简单执行协程)时,函数内部代码不会执行
result = func()

如果需要运行协程函数内部代码,需与事件循环相配合,让事件循环执行

import asyncio

async def func():
    print(1)

loop = asyncio.get_event_loop()
loop.run_until_complete( func() )
# 采用以上两行 或 下面一行
asyncio.run(func())

await关键字

await后接可等待对象。共三种类型:协程对象,future对象,task对象

import asyncio

async def func():
    print('start')
    # io阻塞 - 此时如果有其他任务,会切换执行其他任务
    # 下面的print会等待结果后再执行
    res = await asyncio.sleep(2)
    print(f'end {res}')

asyncio.run(func())

协程对象

import asyncio

async def parse():
    print('start')
    res = await asyncio.sleep(2)
    print(f'end')
    return True

async def run():
    print('run')
    # 此处await后跟协程对象
    # 若有其他任务会执行其他任务,此处只执行run函数,因此无其他任务
    # 会等待结果后再往下执行
    res = await parse()
    print(f'finish, parse res is {res}')

asyncio.run(run())

说明: await关键字后的代码需等待结果返回后才会往下执行,若是多个任务并发会自动切换其他任务。

若下一步需要上一步的执行结果,需用await修饰

Task对象

Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象,Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象 完成,被打包的协程将恢复执行。(https://docs.python.org/zh-cn/3.7/library/asyncio-task.html#asyncio.Task)

在事件循环中,添加多个任务

通过asyncio.create_task(协程对象)的方式创建Task对象(py3.7)

更低层的实现方式loop.create_task()ensure_future()

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return True

async def main():
    print('run')
    
    # 创建task对象,创建即将任务添加到事件循环
    task1 = asyncio.create_task(func())
    task2 = asyncio.create_task(func())
    
    # 此处任务已创建完毕
    print('run end')
    
    res1 = await task1  # 执行至此时,先等待结果,发现内有io,切换到其他任务
    res2 = await task2
    print(res1, res2)

asyncio.run(main())
# 事件循环共3个任务,main + func*2

采用任务列表方式。此时Task列表在协程函数内,执行创建任务前事件循环已创建。

import asyncio

async def func(a):
    print(1)
    await asyncio.sleep(2)
    print(a)
    print(2)
    return True

async def main():
    print('run')
    
    # task1 = asyncio.create_task(func())
    # task2 = asyncio.create_task(func())
    # 使用任务列表
    task_list = [
        asyncio.create_task(func(11), name='11'),
        asyncio.create_task(func(22), name='22')
    ]
    
    print('run end')
    
    # res1 = await task1
    # res2 = await task2
    # print(res1, res2)
    
    # await后只能跟协程对象,task,future,不能跟列表。列表需要构造成目标对象
    # 返回事件循环内的两个集合
    # done中是任务结果的返回值集合
    # 若传入timeout参数 asynio.wait(task_list, timeout=1) 则未完成的任务在pending中
    done, pending = await asyncio.wait(task_list, timeout=None)
	print(done)

asyncio.run(main())

创建任务不可出现在事件循环前,改用协程对象

import asyncio

async def func(a):
    print(1)
    await asyncio.sleep(2)
    print(a)
    print(2)
    return True

"""
以下写法
task_list = [
    asyncio.create_task(func(11), name='11'),
    asyncio.create_task(func(22), name='22')
]

done, pending = asyncio.run(asyncio.wait(task_list))
print(done)
由于create_task会立即将Task加入循环,而此时循环还未创建,会报错
"""

# 改为存放协程对象
task_list = [
    func(11),
    func(22)
]

# 生成事件循环后将协程对象构造成task对象
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)

asyncio模块 future对象

Task类的父类。用于存放异步完成的结果(运行状态、返回值等),task对象内的结果来源自future。

通常不会使用future对象,以下代码仅用于表明特征

async def main():
    # 获取当前正在运行的事件循环
    loop = asynio.get_running_loop()
    
    # 创建一个future对象(任务), 不进行操作
    f = loop.create_future()
    
    # 等待结果 - 由于没有任何操作,没有结果,会一直等待
    await f

asyncio.run(main())

修改运行结果让await等待到内容

import asyncio

async def set_after(f):
    await asyncio.sleep(2)
    f.set_result('111')

async def main():
    loop = asynio.get_running_loop()
    
    f = loop.create_future()
    
    # 通过代码将future对象赋值
    await loop.create_task(set_after(f))

    res = await f
    print(res)

asyncio.run(main())

concurrent模块 future对象

与asyncio模块的future对象没有关系

使用线程池,进程池实现异步操作用到的对象

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

def func(a):
    time.sleep(1)
    print(a)
 
pool = ThreadPoolExecutor(max_workers=5)
# pool = ProcessPoolExecutor(max_workers=5)

for i in range(10):
    # 一共5个线程但提交了10个任务
    # 此处是线程异步产生的future对象
    fut = pool.submit(func, 1)
    print(fut)

交叉使用:采用asyncio异步编程时,某些三方模块不支持asyncio

import time
import asyncio
import concurrent.futures

# 普通函数
def func1():
    time.sleep(2)
    return True

async def main():
    loop = asyncio.get_running_loop()
    
    # 该行代码内部有两个主要逻辑
    # 此处为普通函数。默认创建线程池,以线程方式异步。得到线程future对象
    # asyncio将线程future对象转化成asyncio协程的fut对象(通过asyncio.wrap_future)
    fut = loop.run_in_executor(None. func1)  # 此处传None将默认使用线程池
    result = await fut
    
    print('default thread pool', result)
    
    # 传统写法,上述代码的内部实现类似下方逻辑
    # with concurrent.futures.ThreadPoolExecutor() as pool:
    #   result = await loop.run_in_executor(pool, func1)
    #   print('custom thread pool', result)

asyncio.run(main())

使用进程的协程与进程交叉使用

import time
import asyncio
import concurrent.futures

# 普通函数
def func1():
    time.sleep(2)
    return True

async def main():
    loop = asyncio.get_running_loop()
    
	with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, func1)
        print('custom process pool', result)

asyncio.run(main())

实例:asyncio + requests(不支持异步模块)

import asyncio
import requests


asyncio def download(url):
    print('download start')
    loop = asyncio.get_event_loop()
    # 第二个参数为目标函数,后续参数为目标函数参数
    future = lool.run_in_executor(None, requests.get, url)
    
    resp = await future
    print('download end')
    with open(url.rsplit('/')[-1], 'wb') as f:
        f.write(resp.content)

if __name__ == '__main__':
    url_list = [
        'http://****',
        'http://****',
        'http://****'
    ]
    tasks = [download(url) for url in url_list]
    # main函数中创建协程对象,且创建与启动相连。使用低层方式启动循环
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

异步迭代器

迭代器:在内部实现__iter__()__next__()

异步迭代器:实现了__aiter__()__anext__()

异步可迭代对象:async for中被使用的对象,__aiter__()方法返回asynchronous iteratior

import asyncio

class Reader:
    def __init__(self):
        self.count = 0
  
    async def readline(self):  # 该方法在下方的调用处有await修饰,需async声明
        # await asyncio.sleep(1)
        self.count += 1
        if self.count == 100:
            return None
        return self.count
      
    def __aiter__(self):  # 有该方法表明该类为一个迭代器,该方法需返回一个可迭代对象
        return self
  
    async def __anext__(self):
        val = await self.readline()  # 协程对象
        if val == None:  # 迭代器终止
            raise StopAsyncIteration
        return val


async def run():
	obj = Reader()
	async for item in obj:  # async for 必须在协程函数内
  	    print(item)

asyncio.run(run())

在声明某个对象时,若该对象操作存在io阻塞,可使用

异步上下文管理器

上下文管理器:with,内有__enter__()__exit__()方法

异步上下文管理器对象通过定义__aenter__()__aexit__(),对async with中的环境进行控制

class AsyncContextManager:
    def __init__(self):
        self.conn = conn

    async def do_something(self):
        # 异步操作数据库
        return True

    async def __aenter__(self):
        # 异步连接数据库
        self.conn = await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_tyoe, exc, tb):
        # 异步关闭数据库连接
        await asyncio.sleep(1)

async def func():
  # 实例化这个上下文管理器 - async with 必须嵌套进异步函数内
    async with AsyncContextManager() as obj:
        result = await f.do_something()
        print(result)

asyncio.run(func())

uvloop

asyncio事件循环的替代方案。事件循环效率 大于 默认asyncio的事件循环。(提高性能使用)

pip3 install uvloop

基本使用 - 只需在开始前声明循环方式为uvloop,其余与原先写法不变

import asyncio
import uvloop
# 只需在原有基础上声明事件循环策略使用uvloop
asynico.set_event_loop_policy(uvloop.EventLoopPolicy())

# 原有代码

# 原有运行方式
asyncio.run(...)

注意:异步网络服务(django3、fastapi)asgi(wsgi的异步版本)内部使用了uvicorn,而uvicorn内使用了uvloop

实例

redis

python代码连接/读写/断开redis都是用网络IO

pip3 install aioredis

实例1:只有redis操作任务,遇到io加上await关键字

import ayncio
import aioredis

async def execute(address, password):
    print('start: ', address)
    # 网络IO操作: 连接
    redis = await aioredis.create_redis(address, password=password)
    
    # 网络IO操作: redis新增数据car:{key1: 1, key2: 2, key3: 3}
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    
    # 网络IO操作: 去redis中获取值
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)
    
    redis.close()
    # 网络IO操作: 断开
    await redis.wait_closed()
    
    print('end:', address)

asyncio.run( execute('redis://47.93.4.198:6379', 'root12345') )

实例2:多个任务下

import ayncio
import aioredis

async def execute(address, password):
    print('start: ', address)
    # 网络IO操作: 连接
    redis = await aioredis.create_redis(address, password=password)
    
    # 网络IO操作: redis新增数据car:{key1: 1, key2: 2, key3: 3}
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    
    # 网络IO操作: 去redis中获取值
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)
    
    redis.close()
    # 网络IO操作: 断开
    await redis.wait_closed()
    
    print('end:', address)


task_list = [
    execute('redis://47.93.4.197:6379', 'root12345'),
    execute('redis://47.93.4.198:6379', 'root12345')
]
  
asyncio.run( asyncio.wait(task_list) )

MySQL

pip3 install aiomysql

实例1

import asyncio
import aiomysql

async def execute(host, password):
    print('start: ', host)
    # 网络IO操场:先连任务1的地址,遇到io自动连任务2的地址
    conn = await aoimysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
    
    # 网络IO操场:遇到IO自动切换任务
    cur = await conn.cursor()
    
    # 网络IO操场:遇到IO自动切换任务
    await cur.execute('SELECT Host,User FROM user')
    
    # 网络IO操场:遇到IO自动切换任务
    result = await cur.fetchall()
    print(result)
    
    # 网络IO操场:遇到IO自动切换任务
    await cur.close()
    conn.close()
    print('start: ', host)
  
task_list = [
    execute('47.93.41.197', 'root12345'),
    execute('47.93.40.197', 'root12345')
]

asyncio.run( asyncio.wait(task_list) )

web框架

Fastapi为例,说明web框架中异步流程

pip3 install fastapi
pip3 install uvicorn  # fastapi运行 - asgi(wsgi的异步版本)

实例一:普通接口的写法与flask类似

import asyncio
import uvicorn
from fastapi import FastAPI()

app = FaskAPI()

@app.get('/')
def index():
    # 两个人同时访问该接口,排队执行 - 同步方式
    return {'data': 'hello world'}

if __name__ == '_main_':
    uvicorn.run('test:app', host='127.0.0.1', port=5000, log_level='info')

实例二:异步接口。遇到await关键字自动处理下一个请求

import aioredis
import uvicorn
from aioredis import Redis
from fastapi import FastAPI

app = FastAPI()

# 连接池
REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password='root123', minsize=1, maxsize=10)

@app.get('/red')
async def red():
    """异步接口"""
    print('start')
    # 遇到io,切换到下一个请求
    await asyncio.sleep(3)
    conn = await REDIS_POOL.acquire()
    redis = Redis(conn)
    
    # 网络IO操作: redis新增数据car:{key1: 1, key2: 2, key3: 3}
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    
    # 网络IO操作: redis读取数据
    result = await redis.hget('car', encoding='utf-8')
    print(result)
    
    # 连接归还
    REDIS_POOL.release(conn)
    
    return result

if __name__ == '_main_':
    # python文件名test.py内的app对象运行
    uvicorn.run('test:app', host='127.0.0.1', port=5000, log_level='info')

爬虫

pip3 install aiohttp

实例

import aiohttp
import asyncio

async def fetch(session.url):
    print('req url is ', url)
    async with session.get(url, verify_ssl=False) as response:
        text = await response.text()
        print('resp is ', url, len(text))
        return text

async def main():
    url_list = [
        'https://1',
        'https://2',
        'https://3',
    ]
    tasks = [asyncop.create_task(fetch(session, url)) for url in url_list]
    done, pending = await saynco.wait(tasks)

if __name__ == '__main__':
    asyncio.run( main() )

总结

运用更少的资源(协程而非线程或进程)实现并发

通过一个线程利用IO等待事件去做其他事情