asyncioは非同期処理を実行するためのPython標準ライブラリ。シングルスレッドで動作し(not 並列処理 but 並行処理)、APIのリクエストなどCPUのリソースに空きが生じる、いわゆるI/Oバウンドな処理を効率的に捌くことができる。
asyncioにはコルーチンを中心とした高レベルAPIとイベントループ、Futureオブジェクトを中心とした低レベルAPIがあるが、高レベルAPIに焦点を当てて説明する。
流れ
基本的にはこれだけ。
- 処理をコルーチンとして定義する
-
asyncio.create_task
により非同期的に扱いたい処理をタスクとしてスケジュールする -
asyncio.run
によりタスクを非同期的に処理する
登場人物
イベントループ
スケジュールしたタスクを順番に処理するキューのような仕組み。高レベルAPIでは直接触ることはないが、asyncio.run
を実行したときには裏でイベントループが働いている。
コルーチン
async def
により定義される関数をコルーチン関数という。通常の関数と異なり、呼び出すとコルーチンオブジェクトを返す。
>>> async def hello():
... return "Hello!"
...
>>> hello()
<coroutine object hello at 0x7fb36824fcc0>
通常の関数の内部で別の関数を呼び出すのと同様に、コルーチンも内部で別のコルーチンを呼ぶことができる。呼び出す際にはawait式を使う(ジェネレータをyield from
でチェインさせるのと同じようなノリ)。await
により評価されたコルーチンオブジェクトはコルーチン関数の戻り値を返す。
async def main():
output = await hello()
print(output)
asyncio.run(main())
# Hello!
Task
コルーチンが処理を表す設計図だとすると、Taskオブジェクトが実際にスケジュールされるタスクそのもの。asyncio.create_task(coroutine)
を通してタスクをスケジュールし、対応するTaskオブジェクトを得る。
task = asyncio.create_task(hello())
Future
非同期処理の最終結果を表す低レベルオブジェクト。高レベルAPIを利用する限り直接的に触ることはないが、後述するようにこのFutureオブジェクトが非同期的な振る舞いを実現する要となっている。TaskオブジェクトはFutureオブジェクトのサブクラスとなっている。
コルーチンオブジェクト、Taskオブジェクト、Futureオブジェクトをまとめてawaitableオブジェクトという。
非同期処理の仕組み
awaitableオブジェクトは基本的にawait式と組み合わせて記述する。await awaitable
でawaitableオブジェクトが完了するまで待つことを意味するが、オブジェクトの種類によって若干振る舞いが異なる。
await coroutine
(単に)コルーチンの処理を実行し、完了するまで待つ。
await future
Futureオブジェクトの完了まで制御を手放す(重要)。すなわちスケジュールされた他のタスクに手を移す。
つまり、await future
が複数のタスクを協調的に非同期処理する仕掛けになっている。(参考:python3 の async/awaitを理解する - Qiita)
内部的には、コルーチンの一連の処理の中にawait future
がなければ最後まで一気に実行、await future
があればそこで処理を中断、イベントループに再スケジュールして、のちほど処理を再開するという仕組みになってるらしい。(参考:Pythonの非同期通信(asyncioモジュール)入門を書きました - ゆくゆくは有へと)
非同期関数
asyncio.sleep
最も基本的な非同期関数。指定した秒数処理の実行を止め、他のスケジュールされたタスクに制御を渡す。引数に0を与えることで、スリープせずにただ制御を手放すといった使い方も可能。
asyncio.sleep
自体はコルーチンだが、内部でawait future
を呼び出しているため、このような非同期的な振る舞いが実現されている。
time.sleep
は通常の関数であり実行中に制御を手放さないので、スリープ中に他のタスクを実行することはできない。
非同期処理の例
最も簡単な例
コルーチン内でasyncio.sleep
を呼び出して制御を手放すことで、スケジュールした複数のタスクを非同期的に実行していることがわかる。
async def func(id):
print(f"{id}: start at {datetime.now()}")
await asyncio.sleep(1) # 制御を手放す
print(f"{id}: end at {datetime.now()}")
async def main():
task1 = asyncio.create_task(func(id=1))
task2 = asyncio.create_task(func(id=2))
task3 = asyncio.create_task(func(id=3))
print("START")
await task1
await task2
await task3
print("END")
asyncio.run(main())
# START
# 1: start at 2023-10-27 02:04:13.557857
# 2: start at 2023-10-27 02:04:13.557913
# 3: start at 2023-10-27 02:04:13.557942
# 1: end at 2023-10-27 02:04:14.558262
# 2: end at 2023-10-27 02:04:14.558327
# 3: end at 2023-10-27 02:04:14.558354
# END
コルーチンは、create_task
を渡した時点でタスクとしてスケジュールされ実行の開始を待機している状態になり、制御が移ると速やかに実行が開始される。await task
を呼んだタイミングで初めて処理が開始されるわけではないことに注意。
たとえば、await task
を呼ぶ前にasyncio.sleep
を呼び、main
から制御を手放してみる。
async def func(id):
print(f"{id}: start at {datetime.now()}")
await asyncio.sleep(1) # 制御を手放す
print(f"{id}: end at {datetime.now()}")
async def main():
task1 = asyncio.create_task(func(id=1))
task2 = asyncio.create_task(func(id=2))
task3 = asyncio.create_task(func(id=3))
await asyncio.sleep(3) # 追加
print("START")
await task1
await task2
await task3
print("END")
asyncio.run(main())
# 1: start at 2023-10-27 02:04:25.895476
# 2: start at 2023-10-27 02:04:25.895530
# 3: start at 2023-10-27 02:04:25.895552
# 1: end at 2023-10-27 02:04:26.896737
# 2: end at 2023-10-27 02:04:26.896796
# 3: end at 2023-10-27 02:04:26.896819
# START
# END
このように、main
内でawait task
を呼ぶ前に既にタスクの実行が完了していることがわかる。
作成したTaskオブジェクトは必ずawaitしないと、処理の完了が保証されないことにも注意。つまり、完了を待たずにイベントループが閉じてしまう可能性がある。
OpenAI APIで非同期処理
より実践的に非同期関数openai.ChatCompletion.acreate
を使って非同期的にリクエストを送る例。リクエストの間隔は1秒あけるようにしている。time.sleep
ではなくasyncio.sleep
を用いることで制御がmain
からスケジュールされたタスクに移り、非同期的な処理が実現できる。
async def call_api(prompt):
response = await openai.ChatCompletion.acreate(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response
async def main():
tasks = []
for _ in range(100):
tasks.append(asyncio.create_task(call_api("some prompt")))
await asyncio.sleep(1)
[await task for task in tasks]
asyncio.run(main())