用 asyncio 开发¶
异步编程与传统的“顺序”编程不同。
本页列出常见的错误和陷阱,并解释如何避免它们。
Debug 模式¶
默认情况下,asyncio 以生产模式运行。为了简化开发,asyncio 还有一种*debug 模式* 。
有几种方法可以启用异步调试模式:
将
PYTHONASYNCIODEBUG环境变量设置为1。使用 Python 开发模式。
将
debug=True传递给asyncio.run()。调用
loop.set_debug()。
除了启用调试模式外,还要考虑:
将 asyncio logger 的级别设为
logging.DEBUG,例如下面的代码片段可以在应用程序启动时运行:logging.basicConfig(level=logging.DEBUG)
配置
warnings模块以显示ResourceWarning警告。一种方法是使用-Wdefault命令行选项。
启用调试模式时:
许多非线程安全的异步 APIs (例如
loop.call_soon()和loop.call_at()方法),如果从错误的线程调用,则会引发异常。如果执行 I/O 操作花费的时间太长,则记录 I/O 选择器的执行时间。
执行时间超过 100 毫秒的回调会被写入日志。
loop.slow_callback_duration属性可用于设置以秒为单位的将被视为“缓慢”的最小执行持续时间。
并发性和多线程¶
事件循环在线程中运行 (通常是主线程),并在其线程中执行所有回调和任务。当一个任务在事件循环中运行时,没有其他任务可以在同一个线程中运行。当一个任务执行一个 await 表达式时,正在运行的任务被挂起,事件循环执行下一个任务。
要调度来自另一 OS 线程的 callback,应该使用 loop.call_soon_threadsafe() 方法。 例如:
loop.call_soon_threadsafe(callback, *args)
几乎所有异步对象都不是线程安全的,这通常不是问题,除非在任务或回调函数之外有代码可以使用它们。如果需要这样的代码来调用低级异步 API,应该使用 loop.call_soon_threadsafe() 方法,例如:
loop.call_soon_threadsafe(fut.cancel)
要从不同的 OS 线程调度一个协程对象,应该使用 run_coroutine_threadsafe() 函数。它返回一个 concurrent.futures.Future。查询结果:
async def coro_func():
return await asyncio.sleep(1, 42)
# 随后在另一个 OS 线程中:
future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# 等待结果:
result = future.result()
为了能处理信号,事件循环必须在主线程中运行。
loop.run_in_executor() 方法可以配合 concurrent.futures.ThreadPoolExecutor 或 InterpreterPoolExecutor 使用以在不同的 OS 线程中执行阻塞型的代码而不会阻塞事件循环运行所在的 OS 线程。
目前没有办法直接从另一个进程(如使用 multiprocessing 启动的进程)安排协程或回调。 Event loop methods 小节列出了一些可以从管道读取并监视文件描述符而不会阻塞事件循环的 API。此外,asyncio 的 子进程 API 提供了一种启动进程并在事件循环中与其通信的办法。最后,之前提到的 loop.run_in_executor() 方法也可以配合 concurrent.futures.ProcessPoolExecutor 使用以在另一个进程中执行代码。
运行阻塞的代码¶
不应该直接调用阻塞 ( CPU 绑定) 代码。例如,如果一个函数执行 1 秒的 CPU 密集型计算,那么所有并发异步任务和 IO 操作都将延迟 1 秒。
可以使用执行器让任务运行在不同的线程中,包括不同的解释器中,或者甚至在不同的进程中以避免阻塞事件循环所在的 OS 线程。请参阅 loop.run_in_executor() 方法了解详情。
日志记录¶
asyncio 使用 logging 模块,所有日志记录都是通过 "asyncio" logger 执行的。
默认的日志级别是 logging.INFO,它可以被方便地调整:
logging.getLogger("asyncio").setLevel(logging.WARNING)
网络日志会阻塞事件循环。建议使用一个单独线程来处理日志或者使用非阻塞式 IO。例如,可以参阅 处理日志 handler 的阻塞。
检测从未被等待的协程¶
当协程函数被调用而不是被等待时 (即执行 coro() 而不是 await coro()) 或者协程没有通过 asyncio.create_task() 被排入计划日程,asyncio 将会发出一条 RuntimeWarning:
import asyncio
async def test():
print("never scheduled")
async def main():
test()
asyncio.run(main())
输出:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
test()
调试模式的输出:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
File "../t.py", line 7, in main
test()
test()
通常的修复方法是等待协程或者调用 asyncio.create_task() 函数:
async def main():
await test()
检测从未被获取的异常¶
如果调用了 Future.set_exception(),但 Future 对象从未被等待,异常将永远不会传播到用户代码。在这种情况下,当 Future 对象被垃圾收集时,asyncio 将发出一条日志消息。
未处理异常的例子:
import asyncio
async def bug():
raise Exception("not consumed")
async def main():
asyncio.create_task(bug())
asyncio.run(main())
输出:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed')>
Traceback (most recent call last):
File "test.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed
激活调试模式 以获取任务创建处的跟踪信息:
asyncio.run(main(), debug=True)
调试模式的输出:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed') created at asyncio/tasks.py:321>
source_traceback: Object created at (most recent call last):
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
Traceback (most recent call last):
File "../t.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed
异步生成器最佳实践¶
要编写正确且高效的 asyncio 代码就必须注意某些陷阱。 本节概述了可为您节省大量调试时间的关键最佳实践。
显式地关闭异步生成器¶
It is recommended to manually close the
asynchronous generator. If a generator
exits early - for example, due to an exception raised in the body of
an async for loop - its asynchronous cleanup code may run in an
unexpected context. This can occur after the tasks it depends on have completed,
or during the event loop shutdown when the async-generator's garbage collection
hook is called.
To avoid this, explicitly close the generator by calling its
aclose() method, or use the contextlib.aclosing()
context manager:
import asyncio
import contextlib
async def gen():
yield 1
yield 2
async def func():
async with contextlib.aclosing(gen()) as g:
async for x in g:
break # Don't iterate until the end
asyncio.run(func())
As noted above, the cleanup code for these asynchronous generators is deferred. The following example demonstrates that the finalization of an asynchronous generator can occur in an unexpected order:
import asyncio
work_done = False
async def cursor():
try:
yield 1
finally:
assert work_done
async def rows():
global work_done
try:
yield 2
finally:
await asyncio.sleep(0.1) # immitate some async work
work_done = True
async def main():
async for c in cursor():
async for r in rows():
break
break
asyncio.run(main())
For this example, we get the following output:
unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-3' coro=<<async_generator_athrow without __name__>()> exception=AssertionError()>
Traceback (most recent call last):
File "example.py", line 6, in cursor
yield 1
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "example.py", line 8, in cursor
assert work_done
^^^^^^^^^
AssertionError
The cursor() asynchronous generator was finalized before the rows
generator - an unexpected behavior.
The example can be fixed by explicitly closing the
cursor and rows async-generators:
async def main():
async with contextlib.aclosing(cursor()) as cursor_gen:
async for c in cursor_gen:
async with contextlib.aclosing(rows()) as rows_gen:
async for r in rows_gen:
break
break
Create asynchronous generators only when the event loop is running¶
It is recommended to create asynchronous generators only after the event loop has been created.
To ensure that asynchronous generators close reliably, the event loop uses the
sys.set_asyncgen_hooks() function to register callback functions. These
callbacks update the list of running asynchronous generators to keep it in a
consistent state.
When the loop.shutdown_asyncgens()
function is called, the running generators are stopped gracefully and the
list is cleared.
The asynchronous generator invokes the corresponding system hook during its first iteration. At the same time, the generator records that the hook has been called and does not call it again.
Therefore, if iteration begins before the event loop is created, the event loop will not be able to add the generator to its list of active generators because the hooks are set after the generator attempts to call them. Consequently, the event loop will not be able to terminate the generator if necessary.
比如下面的例子:
import asyncio
async def agenfn():
try:
yield 10
finally:
await asyncio.sleep(0)
with asyncio.Runner() as runner:
agen = agenfn()
print(runner.run(anext(agen)))
del agen
输出:
10
Exception ignored while closing generator <async_generator object agenfn at 0x000002F71CD10D70>:
Traceback (most recent call last):
File "example.py", line 13, in <module>
del agen
^^^^
RuntimeError: async generator ignored GeneratorExit
该示例可用以下方式修复:
import asyncio
async def agenfn():
try:
yield 10
finally:
await asyncio.sleep(0)
async def main():
agen = agenfn()
print(await anext(agen))
del agen
asyncio.run(main())
避免同一迭代器的并发迭代和闭包¶
异步生成器可以在其他 __anext__() / athrow() / aclose() 调用正在进行时重新进入。 这可能导致异步生成器状态不一致并造成错误。
Let's consider the following example:
import asyncio
async def consumer():
for idx in range(100):
await asyncio.sleep(0)
message = yield idx
print('received', message)
async def amain():
agenerator = consumer()
await agenerator.asend(None)
fa = asyncio.create_task(agenerator.asend('A'))
fb = asyncio.create_task(agenerator.asend('B'))
await fa
await fb
asyncio.run(amain())
输出:
received A
Traceback (most recent call last):
File "test.py", line 38, in <module>
asyncio.run(amain())
~~~~~~~~~~~^^^^^^^^^
File "Lib/asyncio/runners.py", line 204, in run
return runner.run(main)
~~~~~~~~~~^^^^^^
File "Lib/asyncio/runners.py", line 127, in run
return self._loop.run_until_complete(task)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "Lib/asyncio/base_events.py", line 719, in run_until_complete
return future.result()
~~~~~~~~~~~~~^^
File "test.py", line 36, in amain
await fb
RuntimeError: anext(): asynchronous generator is already running
Therefore, it is recommended to avoid using asynchronous generators in parallel tasks or across multiple event loops.