用 asyncio 开发

异步编程与传统的“顺序”编程不同。

本页列出常见的错误和陷阱,并解释如何避免它们。

Debug 模式

默认情况下,asyncio 以生产模式运行。为了简化开发,asyncio 还有一种*debug 模式* 。

有几种方法可以启用异步调试模式:

除了启用调试模式外,还要考虑:

  • asyncio logger 的级别设为 logging.DEBUG,例如下面的代码片段可以在应用程序启动时运行:

    logging.basicConfig(level=logging.DEBUG)
    
  • 配置 warnings 模块以显示 ResourceWarning 警告。一种方法是使用 -W default 命令行选项。

启用调试模式时:

  • 许多非线程安全的异步 APIs (例如 loop.call_soon()loop.call_at() 方法),如果从错误的线程调用,则会引发异常。

  • 如果执行 I/O 操作花费的时间太长,则记录 I/O 选择器的执行时间。

  • 执行时间超过 100 毫秒的回调会被写入日志。 loop.slow_callback_duration 属性可用于设置以秒为单位的将被视为“缓慢”的最小执行持续时间。

并发性和多线程

事件循环在线程中运行 (通常是主线程),并在其线程中执行所有回调和任务。当一个任务在事件循环中运行时,没有其他任务可以在同一个线程中运行。当一个任务执行一个 await 表达式时,正在运行的任务被挂起,事件循环执行下一个任务。

要调度来自另一 OS 线程的 callback,应该使用 loop.call_soon_threadsafe() 方法。 例如:

loop.call_soon_threadsafe(callback, *args)

几乎所有异步对象都不是线程安全的,这通常不是问题,除非在任务或回调函数之外有代码可以使用它们。如果需要这样的代码来调用低级异步 API,应该使用 loop.call_soon_threadsafe() 方法,例如:

loop.call_soon_threadsafe(fut.cancel)

要从不同的 OS 线程调度一个协程对象,应该使用 run_coroutine_threadsafe() 函数。它返回一个 concurrent.futures.Future。查询结果:

async def coro_func():
     return await asyncio.sleep(1, 42)

# 随后在另一个 OS 线程中:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# 等待结果:
result = future.result()

为了能处理信号,事件循环必须在主线程中运行。

loop.run_in_executor() 方法可以配合 concurrent.futures.ThreadPoolExecutorInterpreterPoolExecutor 使用以在不同的 OS 线程中执行阻塞型的代码而不会阻塞事件循环运行所在的 OS 线程。

目前没有办法直接从另一个进程(如使用 multiprocessing 启动的进程)安排协程或回调。 Event loop methods 小节列出了一些可以从管道读取并监视文件描述符而不会阻塞事件循环的 API。此外,asyncio 的 子进程 API 提供了一种启动进程并在事件循环中与其通信的办法。最后,之前提到的 loop.run_in_executor() 方法也可以配合 concurrent.futures.ProcessPoolExecutor 使用以在另一个进程中执行代码。

运行阻塞的代码

不应该直接调用阻塞 ( CPU 绑定) 代码。例如,如果一个函数执行 1 秒的 CPU 密集型计算,那么所有并发异步任务和 IO 操作都将延迟 1 秒。

可以使用执行器让任务运行在不同的线程中,包括不同的解释器中,或者甚至在不同的进程中以避免阻塞事件循环所在的 OS 线程。请参阅 loop.run_in_executor() 方法了解详情。

日志记录

asyncio 使用 logging 模块,所有日志记录都是通过 "asyncio" logger 执行的。

默认的日志级别是 logging.INFO,它可以被方便地调整:

logging.getLogger("asyncio").setLevel(logging.WARNING)

网络日志会阻塞事件循环。建议使用一个单独线程来处理日志或者使用非阻塞式 IO。例如,可以参阅 处理日志 handler 的阻塞

检测从未被等待的协程

当协程函数被调用而不是被等待时 (即执行 coro() 而不是 await coro()) 或者协程没有通过 asyncio.create_task() 被排入计划日程,asyncio 将会发出一条 RuntimeWarning:

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

输出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

调试模式的输出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

通常的修复方法是等待协程或者调用 asyncio.create_task() 函数:

async def main():
    await test()

检测从未被获取的异常

如果调用了 Future.set_exception(),但 Future 对象从未被等待,异常将永远不会传播到用户代码。在这种情况下,当 Future 对象被垃圾收集时,asyncio 将发出一条日志消息。

未处理异常的例子:

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

输出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

激活调试模式 以获取任务创建处的跟踪信息:

asyncio.run(main(), debug=True)

调试模式的输出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

异步生成器最佳实践

要编写正确且高效的 asyncio 代码就必须注意某些陷阱。 本节概述了可为您节省大量调试时间的关键最佳实践。

显式地关闭异步生成器

It is recommended to manually close the asynchronous generator. If a generator exits early - for example, due to an exception raised in the body of an async for loop - its asynchronous cleanup code may run in an unexpected context. This can occur after the tasks it depends on have completed, or during the event loop shutdown when the async-generator's garbage collection hook is called.

To avoid this, explicitly close the generator by calling its aclose() method, or use the contextlib.aclosing() context manager:

import asyncio
import contextlib

async def gen():
  yield 1
  yield 2

async def func():
  async with contextlib.aclosing(gen()) as g:
    async for x in g:
      break  # Don't iterate until the end

asyncio.run(func())

As noted above, the cleanup code for these asynchronous generators is deferred. The following example demonstrates that the finalization of an asynchronous generator can occur in an unexpected order:

import asyncio
work_done = False

async def cursor():
    try:
        yield 1
    finally:
        assert work_done

async def rows():
    global work_done
    try:
        yield 2
    finally:
        await asyncio.sleep(0.1) # immitate some async work
        work_done = True


async def main():
    async for c in cursor():
        async for r in rows():
            break
        break

asyncio.run(main())

For this example, we get the following output:

unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-3' coro=<<async_generator_athrow without __name__>()> exception=AssertionError()>
Traceback (most recent call last):
  File "example.py", line 6, in cursor
    yield 1
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "example.py", line 8, in cursor
    assert work_done
           ^^^^^^^^^
AssertionError

The cursor() asynchronous generator was finalized before the rows generator - an unexpected behavior.

The example can be fixed by explicitly closing the cursor and rows async-generators:

async def main():
    async with contextlib.aclosing(cursor()) as cursor_gen:
        async for c in cursor_gen:
            async with contextlib.aclosing(rows()) as rows_gen:
                async for r in rows_gen:
                    break
            break

Create asynchronous generators only when the event loop is running

It is recommended to create asynchronous generators only after the event loop has been created.

To ensure that asynchronous generators close reliably, the event loop uses the sys.set_asyncgen_hooks() function to register callback functions. These callbacks update the list of running asynchronous generators to keep it in a consistent state.

When the loop.shutdown_asyncgens() function is called, the running generators are stopped gracefully and the list is cleared.

The asynchronous generator invokes the corresponding system hook during its first iteration. At the same time, the generator records that the hook has been called and does not call it again.

Therefore, if iteration begins before the event loop is created, the event loop will not be able to add the generator to its list of active generators because the hooks are set after the generator attempts to call them. Consequently, the event loop will not be able to terminate the generator if necessary.

比如下面的例子:

import asyncio

async def agenfn():
    try:
        yield 10
    finally:
        await asyncio.sleep(0)


with asyncio.Runner() as runner:
    agen = agenfn()
    print(runner.run(anext(agen)))
    del agen

输出:

10
Exception ignored while closing generator <async_generator object agenfn at 0x000002F71CD10D70>:
Traceback (most recent call last):
  File "example.py", line 13, in <module>
    del agen
        ^^^^
RuntimeError: async generator ignored GeneratorExit

该示例可用以下方式修复:

import asyncio

async def agenfn():
    try:
        yield 10
    finally:
        await asyncio.sleep(0)

async def main():
    agen = agenfn()
    print(await anext(agen))
    del agen

asyncio.run(main())

避免同一迭代器的并发迭代和闭包

异步生成器可以在其他 __anext__() / athrow() / aclose() 调用正在进行时重新进入。 这可能导致异步生成器状态不一致并造成错误。

Let's consider the following example:

import asyncio

async def consumer():
    for idx in range(100):
        await asyncio.sleep(0)
        message = yield idx
        print('received', message)

async def amain():
    agenerator = consumer()
    await agenerator.asend(None)

    fa = asyncio.create_task(agenerator.asend('A'))
    fb = asyncio.create_task(agenerator.asend('B'))
    await fa
    await fb

asyncio.run(amain())

输出:

received A
Traceback (most recent call last):
  File "test.py", line 38, in <module>
    asyncio.run(amain())
    ~~~~~~~~~~~^^^^^^^^^
  File "Lib/asyncio/runners.py", line 204, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "Lib/asyncio/runners.py", line 127, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "Lib/asyncio/base_events.py", line 719, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "test.py", line 36, in amain
    await fb
RuntimeError: anext(): asynchronous generator is already running

Therefore, it is recommended to avoid using asynchronous generators in parallel tasks or across multiple event loops.