asyncio 、たまにしか使わないせいか、よくある書き方をちょいちょい忘れるので、自分なりに整理しておきます。
もし他にもオススメの書き方とかあれば、コメント等で教えてもらえれば大変ありがたいです!
レシピ - 基本編
基本の使い方(呼び出し編、作成編)
まずは基本形。公式ドキュメントの Hello World! ほぼそのままですが。
import asyncio
async def main():
print('Hello ...')
await asyncio.sleep(2) # 2秒待ち
print('World!')
if __name__ == "__main__":
asyncio.run(main())
- 非同期処理される関数には
async
をつける - 非同期処理を呼び出すときは
await
をつける
また、asyncio の sleep は asyncio.sleep()
を使います。うっかり time.sleep()
を使うと、非同期処理をしているスレッドが止まってしまうので注意してください。
なお、 async
がついた関数のことを Python ではコルーチンと呼びます。(公式ドキュメントの説明)
並列実行
asyncio.gather()
を使います。
async def main():
# 並列処理したいコルーチンを asyncio.gather の引数に渡します
await asyncio.gather(
hello("Taro"),
hello("Jiro"),
hello("Saburo")
)
async def hello(name: str, wait_time: int = 2):
""" サンプルのコルーチン """
print('Hello ...')
await asyncio.sleep(wait_time)
print(f'{name}!')
return name # 戻り値に name を返す
コルーチンを list で持つこともできます。その場合は
co_list = [hello("Taro"), hello("Jiro"), hello("Saburo")]
await asyncio.gather(*co_list)
と、先頭に *
をつければ OK です (先頭に *
をつけるのは、list を引数に展開する Python の文法ですね)。
asyncio.gather()
の戻り値は、各コルーチンの戻り値の list になります。
result = await asyncio.gather(
hello("Taro"),
hello("Jiro"),
hello("Saburo")
)
# result = ["Taro", "Jiro", "Saburo"]
並列実行 - 例外発生時にも asyncio.gather
の結果が使えるようにする
asyncio.gather
に return_exceptions=True
という引数を渡すと、呼び出し先で例外が出た場合でも asyncio.gather
の実行を継続できます。
async def main():
result = await asyncio.gather(
hello("Taro"),
hello("Jiro"),
hello("Saburo"),
return_exceptions=True
)
こうしておくと、コルーチンのなかで例外が出た場合にも、その戻り値に例外が返るようになります。他の処理も通常どおり継続されます。1
レシピ - 応用編
応用というほどのものでもないですが、ちょっと手の混んだやり方をこちらに。
投げっぱなしコルーチン
処理によっては、どこか適当なタイミングでやってもらえばよいので、呼び出しのタイミングで await したくない、という処理もあります。そういうときは asyncio.create_task()
を使います。
async def main():
# これで hello("Taro") が起動し、かつ、完了を待たずに main() の処理は継続します
task = asyncio.create_task(hello("Taro"))
...
# どこかのタイミングで await すると、hello("Taro") 含めた、別のコルーチンに制御が移ります
await asyncio.sleep(2)
このように asyncio.create_task()
を使うことで、await
せずにコルーチンを実行させることができます。
タイムアウトの設定
asyncio.wait_for()
を使うと、呼び出す処理にタイムアウトを設定できます。
async def main():
results = await asyncio.wait_for(hello("Taro"), timeout=1)
タイムアウトした場合は asyncio.futures.TimeoutError
例外が発生します。hello("Taro")
は処理が中断されます。
タイムアウトの設定(タイムアウト後も継続)
asyncio.wait_for()
でタイムアウトを設定したときに、 asyncio.shield
を一緒に使うと、タイムアウトをした処理も、処理が中断しないようにできます。
async def main():
try:
results = await asyncio.wait_for(asyncio.shield(hello("Taro")),
timeout=0.5)
print(results)
except asyncio.futures.TimeoutError as e:
print("timeout!")
await asyncio.sleep(2) # ここで hello("Taro") の実行が継続していることが確認できる
async の処理のなかから、一部の処理を別 thread で処理させる
asyncio の非同期処理と、concurrent.futures によるスレッド、共存させることができます。
例えばですが、 FastAPI のような async で各処理が呼ばれるフレームワークを使っているときに、async に対応していない重い処理を呼び出したいとします。そのまま普通に呼び出すと、そこでブロッキングしてしまいますが、別スレッドで実行することで非同期処理をブロッキングせずに重い処理を実行することができます。
具体的には、以下のようにします。
# Python 3.9 の場合
async def main():
# asyncio.to_thread() により、実際の処理を別スレッドで
# 実行するコルーチンが生成されます
co = asyncio.to_thread(heavy_task, "heavy!")
# あとは、通常のコルーチンと同じように呼び出せば OK
results = await asyncio.gather(
hello("Taro"),
hello("Jiro"),
hello("Saburo"),
co
)
print(results)
# Python 3.8 以前、または自前でスレッドプールを管理したい場合
import asyncio
import concurrent.futures
async def main():
# 別処理を実行するためのスレッドプールを生成
with concurrent.futures.ThreadPoolExecutor() as executor:
# loop.run_in_executor() により、実際の処理を別スレッドで
# 実行するコルーチンが生成されます
loop = asyncio.get_running_loop() # py3.6 以前は asyncio.get_event_loop()
co = loop.run_in_executor(executor, heavy_task, "heavy!")
# あとは、通常のコルーチンと同じように呼び出せば OK
results = await asyncio.gather(
hello("Taro"),
hello("Jiro"),
hello("Saburo"),
co
)
print(results)
このようにすると asyncio と協調する形で、重い処理を別スレッドで実行させることができます。
asyncio.to_thread
, run_in_executor
とも、呼び出し方が少し特殊です。
asyncio.to_thread
の場合でいうと、最初の引数に関数そのものを、第2引数以降に、その関数にわたす引数を指定します。asyncio.to_thread(heavy_task("heavy!"))
と書くと、その場で heavy_task("heavy!")
が実行されてしまうので注意してください。
run_in_executor
の場合も、最初の引数が executor
になることを除けば同様です。
なお、 ThreadPoolExecutor()
を ProcessPoolExecutor()
に置き換えると、マルチスレッドではなくマルチプロセスで動作させるようにもできます。
参考資料 - "Combining ayncio and threads in the same application" (PyConJP 2020)
tqdm で進捗を出す
asyncioとtqdmを一緒に使う - Freck こちらのブログで紹介されていた方法そのままです。
tqdm はループの進捗を出してくれる便利ツールです。pip
を使うとパッケージのダウンロード状況がプログレスバーで出てきますが、あんな感じの出力を簡単に実現できます。
コルーチンの進捗を tqdm に食わせるには asyncio.as_completed()
を使うとよいです。これで、複数の非同期処理の進み具合を tqdm で出すことができます。
from tqdm import tqdm
async def main():
tasks = [hello("Taro", wait_time=3),
hello("Jiro", wait_time=2),
hello("Saburo", wait_time=1)]
for f in tqdm(asyncio.as_completed(tasks)):
result = await f
print(result)
asyncio.gather()
と違って、処理が完了した順に結果が得られるので、その点は注意してください。この例だと、 wait_time が短い順で処理が完了するので result
は "Saburo", "Jiro", "Taro"
となります。
参考ドキュメント
いずれも公式ドキュメントです。
- asyncio --- 非同期 I/O — Python 3.9.1 ドキュメント asyncio の全体的な説明
- 高水準の API インデックス — Python 3.9.1 ドキュメント asyncio の "使いやすい" 関数の一覧
- コルーチンと Task — Python 3.9.1 ドキュメント コルーチンを動かす手法として await で呼ぶなど、さまざまな方法があることが解説されています
-
イベントループ — Python 3.9.1 ドキュメント スレッドの併用でつかった
asyncio.get_running_loop()
などの説明 - Developing with asyncio — Python 3.9.1 ドキュメント デバッグモードでの呼び出しなど、少しマニアックな話題
分かりにくい点などありましたら、コメントもらえれば改善していければと思いますので、お気軽にコメントください!
-
正確には、
return_exceptions=False
のときでも並列で動いているコルーチンは中断されません。ただ、コルーチンを待つ部分は例外が生じたことで "抜けて" しまっています。そのため、その後に await する非同期処理がないと、そのままコルーチンの完了を待たずにプログラムが終了します。 ↩