56
40

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Python の asyncio レシピ集

Posted at

asyncio 、たまにしか使わないせいか、よくある書き方をちょいちょい忘れるので、自分なりに整理しておきます。

もし他にもオススメの書き方とかあれば、コメント等で教えてもらえれば大変ありがたいです!

レシピ - 基本編

基本の使い方(呼び出し編、作成編)

まずは基本形。公式ドキュメントの Hello World! ほぼそのままですが。

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(2)  # 2秒待ち
    print('World!')

if __name__ == "__main__":
    asyncio.run(main())
  • 非同期処理される関数には async をつける
  • 非同期処理を呼び出すときは await をつける

また、asyncio の sleep は asyncio.sleep() を使います。うっかり time.sleep() を使うと、非同期処理をしているスレッドが止まってしまうので注意してください。

なお、 async がついた関数のことを Python ではコルーチンと呼びます。(公式ドキュメントの説明)

並列実行

asyncio.gather() を使います。

async def main():
    # 並列処理したいコルーチンを asyncio.gather の引数に渡します
    await asyncio.gather(
        hello("Taro"),
        hello("Jiro"),
        hello("Saburo")
    )

async def hello(name: str, wait_time: int = 2):
    """ サンプルのコルーチン """
    print('Hello ...')
    await asyncio.sleep(wait_time)
    print(f'{name}!')
    return name  # 戻り値に name を返す

コルーチンを list で持つこともできます。その場合は

    co_list = [hello("Taro"), hello("Jiro"), hello("Saburo")]
    await asyncio.gather(*co_list)

と、先頭に * をつければ OK です (先頭に * をつけるのは、list を引数に展開する Python の文法ですね)。

asyncio.gather() の戻り値は、各コルーチンの戻り値の list になります。

    result = await asyncio.gather(
        hello("Taro"),
        hello("Jiro"),
        hello("Saburo")
    )
    # result = ["Taro", "Jiro", "Saburo"]

並列実行 - 例外発生時にも asyncio.gather の結果が使えるようにする

asyncio.gatherreturn_exceptions=True という引数を渡すと、呼び出し先で例外が出た場合でも asyncio.gather の実行を継続できます。

async def main():
    result = await asyncio.gather(
        hello("Taro"),
        hello("Jiro"),
        hello("Saburo"),
        return_exceptions=True
    )

こうしておくと、コルーチンのなかで例外が出た場合にも、その戻り値に例外が返るようになります。他の処理も通常どおり継続されます。1

レシピ - 応用編

応用というほどのものでもないですが、ちょっと手の混んだやり方をこちらに。

投げっぱなしコルーチン

処理によっては、どこか適当なタイミングでやってもらえばよいので、呼び出しのタイミングで await したくない、という処理もあります。そういうときは asyncio.create_task() を使います。

async def main():
    # これで hello("Taro") が起動し、かつ、完了を待たずに main() の処理は継続します
    task = asyncio.create_task(hello("Taro"))
    ...
    # どこかのタイミングで await すると、hello("Taro") 含めた、別のコルーチンに制御が移ります
    await asyncio.sleep(2)

このように asyncio.create_task() を使うことで、await せずにコルーチンを実行させることができます。

タイムアウトの設定

asyncio.wait_for() を使うと、呼び出す処理にタイムアウトを設定できます。

async def main():
    results = await asyncio.wait_for(hello("Taro"), timeout=1)

タイムアウトした場合は asyncio.futures.TimeoutError 例外が発生します。hello("Taro") は処理が中断されます。

タイムアウトの設定(タイムアウト後も継続)

asyncio.wait_for() でタイムアウトを設定したときに、 asyncio.shield を一緒に使うと、タイムアウトをした処理も、処理が中断しないようにできます。

async def main():
    try:
        results = await asyncio.wait_for(asyncio.shield(hello("Taro")),
                                         timeout=0.5)
        print(results)
    except asyncio.futures.TimeoutError as e:
        print("timeout!")

    await asyncio.sleep(2)  # ここで hello("Taro") の実行が継続していることが確認できる

async の処理のなかから、一部の処理を別 thread で処理させる

asyncio の非同期処理と、concurrent.futures によるスレッド、共存させることができます。

例えばですが、 FastAPI のような async で各処理が呼ばれるフレームワークを使っているときに、async に対応していない重い処理を呼び出したいとします。そのまま普通に呼び出すと、そこでブロッキングしてしまいますが、別スレッドで実行することで非同期処理をブロッキングせずに重い処理を実行することができます。

具体的には、以下のようにします。

# Python 3.9 の場合
async def main():
    # asyncio.to_thread() により、実際の処理を別スレッドで
    # 実行するコルーチンが生成されます
    co = asyncio.to_thread(heavy_task, "heavy!")

    # あとは、通常のコルーチンと同じように呼び出せば OK
    results = await asyncio.gather(
        hello("Taro"),
        hello("Jiro"),
        hello("Saburo"),
        co
    )
    print(results)

# Python 3.8 以前、または自前でスレッドプールを管理したい場合
import asyncio
import concurrent.futures


async def main():
    # 別処理を実行するためのスレッドプールを生成
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # loop.run_in_executor() により、実際の処理を別スレッドで
        # 実行するコルーチンが生成されます
        loop = asyncio.get_running_loop()  # py3.6 以前は asyncio.get_event_loop()
        co = loop.run_in_executor(executor, heavy_task, "heavy!")

        # あとは、通常のコルーチンと同じように呼び出せば OK
        results = await asyncio.gather(
            hello("Taro"),
            hello("Jiro"),
            hello("Saburo"),
            co
        )
        print(results)

このようにすると asyncio と協調する形で、重い処理を別スレッドで実行させることができます。

asyncio.to_thread, run_in_executor とも、呼び出し方が少し特殊です。

asyncio.to_thread の場合でいうと、最初の引数に関数そのものを、第2引数以降に、その関数にわたす引数を指定します。asyncio.to_thread(heavy_task("heavy!")) と書くと、その場で heavy_task("heavy!") が実行されてしまうので注意してください。

run_in_executor の場合も、最初の引数が executor になることを除けば同様です。

なお、 ThreadPoolExecutor()ProcessPoolExecutor() に置き換えると、マルチスレッドではなくマルチプロセスで動作させるようにもできます。

参考資料 - "Combining ayncio and threads in the same application" (PyConJP 2020)

tqdm で進捗を出す

asyncioとtqdmを一緒に使う - Freck こちらのブログで紹介されていた方法そのままです。

tqdm はループの進捗を出してくれる便利ツールです。pip を使うとパッケージのダウンロード状況がプログレスバーで出てきますが、あんな感じの出力を簡単に実現できます。

コルーチンの進捗を tqdm に食わせるには asyncio.as_completed() を使うとよいです。これで、複数の非同期処理の進み具合を tqdm で出すことができます。

from tqdm import tqdm

async def main():
    tasks = [hello("Taro", wait_time=3),
             hello("Jiro", wait_time=2),
             hello("Saburo", wait_time=1)]

    for f in tqdm(asyncio.as_completed(tasks)):
        result = await f
        print(result)

asyncio.gather() と違って、処理が完了した順に結果が得られるので、その点は注意してください。この例だと、 wait_time が短い順で処理が完了するので result"Saburo", "Jiro", "Taro" となります。

参考ドキュメント

いずれも公式ドキュメントです。

分かりにくい点などありましたら、コメントもらえれば改善していければと思いますので、お気軽にコメントください!

  1. 正確には、return_exceptions=False のときでも並列で動いているコルーチンは中断されません。ただ、コルーチンを待つ部分は例外が生じたことで "抜けて" しまっています。そのため、その後に await する非同期処理がないと、そのままコルーチンの完了を待たずにプログラムが終了します。

56
40
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
56
40

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?