1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

asyncioで非同期処理

Posted at

asyncioは非同期処理を実行するためのPython標準ライブラリ。シングルスレッドで動作し(not 並列処理 but 並行処理)、APIのリクエストなどCPUのリソースに空きが生じる、いわゆるI/Oバウンドな処理を効率的に捌くことができる。

asyncioにはコルーチンを中心とした高レベルAPIとイベントループ、Futureオブジェクトを中心とした低レベルAPIがあるが、高レベルAPIに焦点を当てて説明する。

流れ

基本的にはこれだけ。

  • 処理をコルーチンとして定義する
  • asyncio.create_taskにより非同期的に扱いたい処理をタスクとしてスケジュールする
  • asyncio.runによりタスクを非同期的に処理する

登場人物

イベントループ

スケジュールしたタスクを順番に処理するキューのような仕組み。高レベルAPIでは直接触ることはないが、asyncio.runを実行したときには裏でイベントループが働いている。

コルーチン

async defにより定義される関数をコルーチン関数という。通常の関数と異なり、呼び出すとコルーチンオブジェクトを返す。

>>> async def hello():
...     return "Hello!"
... 
>>> hello()
<coroutine object hello at 0x7fb36824fcc0>

通常の関数の内部で別の関数を呼び出すのと同様に、コルーチンも内部で別のコルーチンを呼ぶことができる。呼び出す際にはawait式を使う(ジェネレータをyield fromでチェインさせるのと同じようなノリ)。awaitにより評価されたコルーチンオブジェクトはコルーチン関数の戻り値を返す。

async def main():
    output = await hello()
    print(output)

asyncio.run(main())

# Hello!

Task

コルーチンが処理を表す設計図だとすると、Taskオブジェクトが実際にスケジュールされるタスクそのもの。asyncio.create_task(coroutine)を通してタスクをスケジュールし、対応するTaskオブジェクトを得る。

task = asyncio.create_task(hello())

Future

非同期処理の最終結果を表す低レベルオブジェクト。高レベルAPIを利用する限り直接的に触ることはないが、後述するようにこのFutureオブジェクトが非同期的な振る舞いを実現する要となっている。TaskオブジェクトはFutureオブジェクトのサブクラスとなっている。

コルーチンオブジェクト、Taskオブジェクト、Futureオブジェクトをまとめてawaitableオブジェクトという。

非同期処理の仕組み

awaitableオブジェクトは基本的にawait式と組み合わせて記述する。await awaitableでawaitableオブジェクトが完了するまで待つことを意味するが、オブジェクトの種類によって若干振る舞いが異なる。

await coroutine

(単に)コルーチンの処理を実行し、完了するまで待つ。

await future

Futureオブジェクトの完了まで制御を手放す(重要)。すなわちスケジュールされた他のタスクに手を移す。

つまり、await futureが複数のタスクを協調的に非同期処理する仕掛けになっている。(参考:python3 の async/awaitを理解する - Qiita

内部的には、コルーチンの一連の処理の中にawait futureがなければ最後まで一気に実行、await futureがあればそこで処理を中断、イベントループに再スケジュールして、のちほど処理を再開するという仕組みになってるらしい。(参考:Pythonの非同期通信(asyncioモジュール)入門を書きました - ゆくゆくは有へと

非同期関数

asyncio.sleep

最も基本的な非同期関数。指定した秒数処理の実行を止め、他のスケジュールされたタスクに制御を渡す。引数に0を与えることで、スリープせずにただ制御を手放すといった使い方も可能。

asyncio.sleep自体はコルーチンだが、内部でawait futureを呼び出しているため、このような非同期的な振る舞いが実現されている。

time.sleepは通常の関数であり実行中に制御を手放さないので、スリープ中に他のタスクを実行することはできない。

非同期処理の例

最も簡単な例

コルーチン内でasyncio.sleepを呼び出して制御を手放すことで、スケジュールした複数のタスクを非同期的に実行していることがわかる。

async def func(id):
    print(f"{id}: start at {datetime.now()}")
    await asyncio.sleep(1)  # 制御を手放す
    print(f"{id}: end at {datetime.now()}")

async def main():
    task1 = asyncio.create_task(func(id=1))
    task2 = asyncio.create_task(func(id=2))
    task3 = asyncio.create_task(func(id=3))

    print("START")
    await task1
    await task2
    await task3
    print("END")

asyncio.run(main())

# START
# 1: start at 2023-10-27 02:04:13.557857
# 2: start at 2023-10-27 02:04:13.557913
# 3: start at 2023-10-27 02:04:13.557942
# 1: end at 2023-10-27 02:04:14.558262
# 2: end at 2023-10-27 02:04:14.558327
# 3: end at 2023-10-27 02:04:14.558354
# END

コルーチンは、create_taskを渡した時点でタスクとしてスケジュールされ実行の開始を待機している状態になり、制御が移ると速やかに実行が開始される。await taskを呼んだタイミングで初めて処理が開始されるわけではないことに注意。

たとえば、await taskを呼ぶ前にasyncio.sleepを呼び、mainから制御を手放してみる。

async def func(id):
    print(f"{id}: start at {datetime.now()}")
    await asyncio.sleep(1)  # 制御を手放す
    print(f"{id}: end at {datetime.now()}")

async def main():
    task1 = asyncio.create_task(func(id=1))
    task2 = asyncio.create_task(func(id=2))
    task3 = asyncio.create_task(func(id=3))

    await asyncio.sleep(3)  # 追加

    print("START")
    await task1
    await task2
    await task3
    print("END")

asyncio.run(main())

# 1: start at 2023-10-27 02:04:25.895476
# 2: start at 2023-10-27 02:04:25.895530
# 3: start at 2023-10-27 02:04:25.895552
# 1: end at 2023-10-27 02:04:26.896737
# 2: end at 2023-10-27 02:04:26.896796
# 3: end at 2023-10-27 02:04:26.896819
# START
# END

このように、main内でawait taskを呼ぶ前に既にタスクの実行が完了していることがわかる。

作成したTaskオブジェクトは必ずawaitしないと、処理の完了が保証されないことにも注意。つまり、完了を待たずにイベントループが閉じてしまう可能性がある。

OpenAI APIで非同期処理

より実践的に非同期関数openai.ChatCompletion.acreateを使って非同期的にリクエストを送る例。リクエストの間隔は1秒あけるようにしている。time.sleepではなくasyncio.sleepを用いることで制御がmainからスケジュールされたタスクに移り、非同期的な処理が実現できる。

async def call_api(prompt):
    response = await openai.ChatCompletion.acreate(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )
    return response

async def main():
    tasks = []
    for _ in range(100):
        tasks.append(asyncio.create_task(call_api("some prompt")))
        await asyncio.sleep(1)
    [await task for task in tasks]

asyncio.run(main())

参考

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?