使用async关键字的原生协程

异步生成器(在async def中使用yield),例如:

1
2
async def async_gen():
    yield 1

协程函数(即协程),例如:

1
2
3
async def cor():
    res = await awaitable()
    return res 

其中awaitable()指的是实现了__await__()协议的对象

协程的一个例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import asyncio

async def cor_1(n):
    return list(range(n))

async def cor_2(n):
    return list(range(n))

async def chain(m, n):
    r1 = await cor_1(m)
    r2 = await cor_2(n)
    print(r1, r2)

async def main():
    await asyncio.gather(chain(1, 2))

if __name__ == '__main__':
    asyncio.run(main())

asyncio

python原生协程(区别于基于生成器的协程)和协程标准库asyncio高度相关,现在通过一个来自real python的生产消费者的例子(稍微修改了一下,使得更紧凑)来入门协程的一些基本写法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import asyncio
import time
import os
from random import randint

async def make_item(size=5):
    return os.urandom(size).hex()

async def producer(idx, que):
    n = randint(0, 5)
    for _ in range(10):
        await asyncio.sleep(n)
        item = await make_item()
        t = time.perf_counter()
        await que.put((item, t))
        print(f'Producer {idx} added item<{item}> to queue, cost {n} seconds')


async def consumer(idx, que):
    while True:
        await asyncio.sleep(randint(0, 5))
        item, t = await que.get()
        print(f'Consumer {idx} got item<{item}> in {time.perf_counter()-t:0.5f} seconds')
        que.task_done()


async def main(n_prod, n_con):

    que = asyncio.Queue()

    producers = [
        asyncio.create_task(producer(idx, que)) for idx in range(n_prod)
    ]
    consumers = [
        asyncio.create_task(consumer(idx, que)) for idx in range(n_con)
    ]

    await asyncio.gather(*producers)
    await que.join()

    for c in consumers:
        c.cancel()

if __name__ == '__main__':
    asyncio.run(main(5, 10))

相关函数的使用方法参考如下:

asyncio常用函数

1. asyncio.run(coro, *, debug=False)

  • 此函数会运行传入的协程,负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池。

  • 当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。

  • 如果 debug 为 True,事件循环将以调试模式运行。

  • 此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。

2. 使用asyncio.create_task(coro, *, name=None)创建Task

  • 将coro协程 封装为一个Task并调度其执行。返回Task对象。

  • 如果name不为None,它将使用Task.set_name()来设为Task的名称。

  • 该任务会在get_running_loop()返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。

3. 使用asyncio.gather(*aws, loop=None, return_exceptions=False)并发运行Task

  • gather()会并发运行aws序列中的awaitable。

  • 如果aws中的某个awaitable为协程,它将自动被作为一个任务调度

  • 如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与aws中可等待对象的顺序一致

  • 如果return_exceptions为 False (默认),所引发的首个异常会立即传播给等待gather()的任务.aws序列中的其他可等待对象 不会被取消 并将继续运行。

  • 如果return_exceptions为 True,异常会和成功的结果一样处理,并聚合至结果列表

  • 如果gather()被取消,所有被提交 (尚未完成) 的可等待对象也会被取消。

  • 如果aws序列中的任一 Task 或 Future 对象 被取消,它将被当作引发了 CancelledError 一样处理 – 在此情况下 gather() 调用 不会 被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消。

4. asyncio.sleep(delay, result=None, *, loop=None)

asyncio.sleep()会挂起当前任务,以允许其他任务进行

5. 使用asyncio.shield(aw, *, loop=None)屏蔽取消操作

保护一个awaitable防止其被取消。如果aw是一个协程,它将被自动作为任务调度

1
2
async def cor():
    res = await asyncio.shield(something())
  • 若cor()被取消,则something()不会被取消,但something()返回到await语句时会触发CancelledError.
  • 即使使用了sheild(),若something()内部自己取消了,something()仍旧会被取消

6. asyncio.wait_for(aw, timeout, *, loop=None)

  • 等待aw可等待对象完成,指定timeout秒后超时
  • 如果aw是一个协程,则会被自动作为Task调度
  • timeout可以为None(表示等待直到完成),float,int,单位为秒
  • 超时将取消Task并引发asyncio.TimeoutError

7. asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

  • 并发地运行aws可迭代对象中的awaitable并进入阻塞状态直到满足return_when所指定的条件。
  • 返回两个Task/Future集合: (done, pending)。
  • 如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。
  • wait_for()不同,wait()在超时发生时不会取消可等待对象
  • return_when 指定此函数应在何时返回。它必须为以下常数之一:
常数 描述
FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回。
FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于ALL_COMPLETED
ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回。

8. asyncio.as_completed(aws, *, loop=None, timeout=None)

  • 并发地运行aws可迭代对象中的可等待对象。返回一个协程的迭代器。 所返回的每个协程可被等待以从剩余的可等待对象的可迭代对象中获得最早的下一个结果。

  • 如果在所有Future对象完成前发生超时则将引发asyncio.TimeoutError

1
2
fro coro in asyncio.as_completed(aws):
    earliest_result = await coro

9. asyncio.to_thread(func, /, *args, **kwargs)

asyncio.to_thread()的大致思想是,将会阻塞事件循环的IO bound的代码新开线程运行,由于IO bound不受限于GIL,这使得不仅当前事件循环所在的线程没有被阻塞,新开的线程也没有被阻塞

此函数提供的任何*args**kwargs会被直接传给 func。 并且,当前contextvars.Context会被传播,允许在不同的线程中访问来自事件循环的上下文变量

返回一个可被等待以获取 func 的最终结果的协程。

用例子来说明:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import time
import asyncio

def blocking_io():
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1)
    )

    print(f"finished main at {time.strftime('%X')}")

asyncio.run(main())

在任何协程中直接调用 blocking_io() 将会在调用期间阻塞事件循环,导致额外的 1 秒运行时间。 而通过改用 asyncio.to_thread(),我们可以在不同的线程中运行它从而不会阻塞事件循环。

待续