asyncio での開発¶
非同期プログラミングは伝統的な "同期的" プログラミングとは異なります。
このページはよくある間違いや落とし穴を列挙し、それらを回避する方法を説明します。
デバッグモード¶
asyncio はデフォルトで本運用モードで実行されます。いっぽう、開発を容易にするために asyncio は "デバッグモード" を持っています。
asyncio のデバッグモードを有効化する方法はいくつかあります:
PYTHONASYNCIODEBUG環境変数の値を1に設定する。Python 開発モード を使う。
asyncio.run()実行時にdebug=Trueを設定する。loop.set_debug()を呼び出す。
デバッグモードを有効化することに加え、以下も検討してください:
asyncio ロガー のログレベルを
logging.DEBUGに設定します。例えばアプリケーションの起動時に以下を実行します:logging.basicConfig(level=logging.DEBUG)
warningsモジュールがResourceWarning警告を表示するように設定します。やり方のひとつは-Wdefaultコマンドラインオプションを使うことです。
デバッグモードが有効化されたときの動作:
スレッドセーフでない asyncio APIs の多く (
loop.call_soon()やloop.call_at()など) は、誤ったスレッドから呼び出されたときに例外を送出します。I/O セレクタが I/O 処理を実行する時間が長すぎる場合、その実行時間が記録されます。
実行時間が100ミリ秒を超えるコールバックは記録されます。 "遅い" の判断基準となる実行時間の最小値は
loop.slow_callback_duration属性で設定できます。
並行処理とマルチスレッド処理¶
イベントループはスレッド(典型的にはメインスレッド)内で動作し、すべてのコールバックとタスクをそのスレッド内で実行します。ひとつのタスクがイベントループ内で実行される間、他のタスクを同じスレッド内で実行することはできません。タスクが await 式を実行すると、実行中のタスクはサスペンドされ、イベントループは次のタスクを実行します。
別の OS スレッドからのコールバック (callback) をスケジュールする場合、 loop.call_soon_threadsafe() メソッドを使ってください。例:
loop.call_soon_threadsafe(callback, *args)
ほぼ全ての非同期オブジェクトはスレッドセーフではありませんが、タスクやコールバックの外側で非同期オブジェクトを使うコードが存在しない限り、それが問題にはなることはほとんどありません。もしそのような目的で低レベルの asyncio API を呼び出すようなコードを書く必要がある場合、 loop.call_soon_threadsafe() メソッドを使ってください。例:
loop.call_soon_threadsafe(fut.cancel)
別の OS スレッドからコルーチンオブジェクトをスケジュールする場合は、 run_coroutine_threadsafe() メソッドを使ってください。 run_coroutine_threadsafe() は結果にアクセスするための concurrent.futures.Future オブジェクトを返します:
async def coro_func():
return await asyncio.sleep(1, 42)
# Later in another OS thread:
future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()
シグナルの処理を行うには、イベントループはメインスレッド内で実行しなければなりません。
The loop.run_in_executor() method can be used with a
concurrent.futures.ThreadPoolExecutor or
InterpreterPoolExecutor to execute
blocking code in a different OS thread without blocking the OS thread
that the event loop runs in.
現在のところ、 (たとえば multiprocessing で開始したような) 別のプロセスからコルーチンやコールバックを直接スケジュールすることはできません。 Event loop methods 節では、イベントループをブロックすることなくパイプからの読み込みやファイルデスクリプタの監視ができる API のリストを掲載しています。さらに、 asyncio の サブプロセス API はイベントループからプロセスを開始したりプロセスと通信したりする方法を提供します。 最後に、前述の loop.run_in_executor() メソッドは concurrent.futures.ProcessPoolExecutor とともに使用することで、別のプロセス内でコードを実行することもできます。
ブロッキングコードの実行¶
ブロッキングコード (CPU バウンドなコード) を直接呼び出すべきではありません。たとえば、 CPU 負荷の高い関数を1秒実行したとすると、並行に処理されている全ての非同期タスクと I/O 処理は1秒遅れる可能性があります。
An executor can be used to run a task in a different thread,
including in a different interpreter, or even in
a different process to avoid blocking the OS thread with the
event loop. See the loop.run_in_executor() method for more
details.
ログ記録¶
asyncio は logging モジュールを利用し、 全てのログ記録は "asyncio" ロガーを通じて行われます。
デフォルトのログレベルは logging.INFO ですが、これは簡単に調節できます:
logging.getLogger("asyncio").setLevel(logging.WARNING)
ネットワークログ記録は、イベントループをブロックし得ます。ログ処理のスレッドを分離するか、ノンブロッキング IO を使用することを推奨します。例えば、 ブロックする handler を扱う を見てください。
待ち受け処理を伴わないコルーチンの検出¶
コルーチンが呼び出されただけで、待ち受け処理がない場合 (たとえば await coro() のかわりに coro() と書いてしまった場合) 、またはコルーチンが asyncio.create_task() を使わずにスケジュールされた場合、 asyncio は RuntimeWarning 警告を送出します:
import asyncio
async def test():
print("never scheduled")
async def main():
test()
asyncio.run(main())
出力:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
test()
デバッグモードの出力:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
File "../t.py", line 7, in main
test()
test()
通常の修正方法はコルーチンを待ち受ける (await) か、 asyncio.create_task() 関数を呼び出すことです:
async def main():
await test()
回収されない例外の検出¶
もし Future.set_exception() メソッドが呼び出されても、その Future オブジェクトを待ち受けていなければ、例外は決してユーザーコードまで伝播しません。この場合 asyncio は、 Future オブジェクトがガベージコレクションの対象となったときにログメッセージを送出することがあります。
処理されない例外の例:
import asyncio
async def bug():
raise Exception("not consumed")
async def main():
asyncio.create_task(bug())
asyncio.run(main())
出力:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed')>
Traceback (most recent call last):
File "test.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed
タスクが生成された箇所を特定するには、 デバッグモードを有効化して トレースバックを取得してください:
asyncio.run(main(), debug=True)
デバッグモードの出力:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed') created at asyncio/tasks.py:321>
source_traceback: Object created at (most recent call last):
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
Traceback (most recent call last):
File "../t.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed
Asynchronous generators best practices¶
Writing correct and efficient asyncio code requires awareness of certain pitfalls. This section outlines essential best practices that can save you hours of debugging.
Close asynchronous generators explicitly¶
It is recommended to manually close the
asynchronous generator. If a generator
exits early - for example, due to an exception raised in the body of
an async for loop - its asynchronous cleanup code may run in an
unexpected context. This can occur after the tasks it depends on have completed,
or during the event loop shutdown when the async-generator's garbage collection
hook is called.
To avoid this, explicitly close the generator by calling its
aclose() method, or use the contextlib.aclosing()
context manager:
import asyncio
import contextlib
async def gen():
yield 1
yield 2
async def func():
async with contextlib.aclosing(gen()) as g:
async for x in g:
break # Don't iterate until the end
asyncio.run(func())
As noted above, the cleanup code for these asynchronous generators is deferred. The following example demonstrates that the finalization of an asynchronous generator can occur in an unexpected order:
import asyncio
work_done = False
async def cursor():
try:
yield 1
finally:
assert work_done
async def rows():
global work_done
try:
yield 2
finally:
await asyncio.sleep(0.1) # immitate some async work
work_done = True
async def main():
async for c in cursor():
async for r in rows():
break
break
asyncio.run(main())
For this example, we get the following output:
unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-3' coro=<<async_generator_athrow without __name__>()> exception=AssertionError()>
Traceback (most recent call last):
File "example.py", line 6, in cursor
yield 1
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "example.py", line 8, in cursor
assert work_done
^^^^^^^^^
AssertionError
The cursor() asynchronous generator was finalized before the rows
generator - an unexpected behavior.
The example can be fixed by explicitly closing the
cursor and rows async-generators:
async def main():
async with contextlib.aclosing(cursor()) as cursor_gen:
async for c in cursor_gen:
async with contextlib.aclosing(rows()) as rows_gen:
async for r in rows_gen:
break
break
Create asynchronous generators only when the event loop is running¶
It is recommended to create asynchronous generators only after the event loop has been created.
To ensure that asynchronous generators close reliably, the event loop uses the
sys.set_asyncgen_hooks() function to register callback functions. These
callbacks update the list of running asynchronous generators to keep it in a
consistent state.
When the loop.shutdown_asyncgens()
function is called, the running generators are stopped gracefully and the
list is cleared.
The asynchronous generator invokes the corresponding system hook during its first iteration. At the same time, the generator records that the hook has been called and does not call it again.
Therefore, if iteration begins before the event loop is created, the event loop will not be able to add the generator to its list of active generators because the hooks are set after the generator attempts to call them. Consequently, the event loop will not be able to terminate the generator if necessary.
次に示す例について考えてみてください。:
import asyncio
async def agenfn():
try:
yield 10
finally:
await asyncio.sleep(0)
with asyncio.Runner() as runner:
agen = agenfn()
print(runner.run(anext(agen)))
del agen
出力:
10
Exception ignored while closing generator <async_generator object agenfn at 0x000002F71CD10D70>:
Traceback (most recent call last):
File "example.py", line 13, in <module>
del agen
^^^^
RuntimeError: async generator ignored GeneratorExit
This example can be fixed as follows:
import asyncio
async def agenfn():
try:
yield 10
finally:
await asyncio.sleep(0)
async def main():
agen = agenfn()
print(await anext(agen))
del agen
asyncio.run(main())
Avoid concurrent iteration and closure of the same generator¶
Async generators may be reentered while another
__anext__() / athrow() / aclose() call is in
progress. This may lead to an inconsistent state of the async generator and can
cause errors.
Let's consider the following example:
import asyncio
async def consumer():
for idx in range(100):
await asyncio.sleep(0)
message = yield idx
print('received', message)
async def amain():
agenerator = consumer()
await agenerator.asend(None)
fa = asyncio.create_task(agenerator.asend('A'))
fb = asyncio.create_task(agenerator.asend('B'))
await fa
await fb
asyncio.run(amain())
出力:
received A
Traceback (most recent call last):
File "test.py", line 38, in <module>
asyncio.run(amain())
~~~~~~~~~~~^^^^^^^^^
File "Lib/asyncio/runners.py", line 204, in run
return runner.run(main)
~~~~~~~~~~^^^^^^
File "Lib/asyncio/runners.py", line 127, in run
return self._loop.run_until_complete(task)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "Lib/asyncio/base_events.py", line 719, in run_until_complete
return future.result()
~~~~~~~~~~~~~^^
File "test.py", line 36, in amain
await fb
RuntimeError: anext(): asynchronous generator is already running
Therefore, it is recommended to avoid using asynchronous generators in parallel tasks or across multiple event loops.