前置き
業務で大量のWeb APIを並行で投げる必要があり、良い方法がないか探したところ、asyncio が良いソリューションだったため勉強した。勉強したことをまとめて、アウトプットするということで情報をここで公開する。これから asyncio を学習する人の役に立つとうれしい。
想定読者
- 並列・並行処理の概念はわかっているが、asyncio の処理がいまいち理解できてないので理解したい人
検証環境
- Python 3.9.4
- Jupyter Notebook 上で検証
asyncio とは
asyncio (→公式ドキュメント) とはざっくり言ってしまうと、async/await
構文を使ってシングルスレッドで並行処理を行うためのライブラリ。
Python には並列処理・並行処理を行うためのライブラリがいくつかあるが、シングルスレッドで動くのがポイント。下記図のように、あるライブラリではマルチプロセスで動作し並列処理を行い、また別のライブラリではマルチスレッドで動作して並行処理を行う。
上記図で示している通り、Pythonではマルチスレッドとシングルスレッドでは扱えるCPUに差がないため、処理速度向上の点であまり差がない。強いて違いを挙げるとすると、マルチスレッドによる並行処理では実行するスレッドをGILが管理するため、実装者は実行の切り替わりをコントロールできないが、asyncioを利用したシングルスレッドによる並行処理はawait
式によってある程度切り替わりをコントロールできる点。加えて、スレッドを立ち上げで管理する分、マルチスレッドのほうが処理コストが大きい可能性がある。マルチスレッドで扱えるCPUが増えないのはPythonのGILによるものだが、ここでは説明を割愛する。
この記事ではシングルスレッドで並行処理を行う asyncio を解説する。
asyncioによる並行処理の概念
ざっくり言うと、下記図のように複数のコルーチン(後続で説明)をそれぞれブロック化して、イベントループなる実行機構がブロックを全部消費するまでIteratorのように1つ1つブロックを順番を管理しながら処理していくのがasyncioによる並行処理のイメージになる。
このコルーチンを作るのが async
で、ブロックを区切っていくのが await
になる。
asyncio は async
と await
の挙動を理解できればほぼ理解できたと言っても過言ではない。
async
とawait
async
async
とはコルーチンを作る文である。**コルーチンとは、中断・再開ができる処理の一連(ルーチン)**である。補足としてルーチンには、メインルーチン、サブルーチン、コルーチンがあり、関係を図示すると以下の図のようになる。
Pythonにおけるコルーチンは関数のように定義できるが、通常のように関数呼び出しでは実行できない。
async def coroutine():
print('コルーチンです。コンニチハ!')
return
coroutine()
<coroutine object coroutine at 0x0000028BA8FFD0C0>
コルーチンを実行するには実行機構(イベントループ)が必要になる。実行するための高レベルAPIとして asyncio.run()
があるので、基本それを使うことになる。
asyncio.run(coroutine())
コルーチンです。コンニチハ!
注意: Jupyter Notebookではイベントループがすでに立ち上がっており、各セルはイベントループ内で実行されるため、 asyncio.run()
が使えず、代わりに await coroutine()
のように await
を使う。入れ子のようにイベントループ内でイベントループを作ることができないためである。
await
await
は await
の対象となる Awaitable オブジェクトの実行結果が返ってくるまで、await
が宣言された実行中のコルーチンを一時停止させ、計算リソースを手放す式である。ざっくり await
のイメージを説明すると、コルーチンの内部を await
の前と後で分割してブロック化する感じになる。例えば以下のようなコルーチンがあった場合、①で await
が実行されるとコルーチンである say_after
が①の処理が終わるまで一時停止し、計算リソースを手放す。空いた計算リソースはイベントループが管理しているReady(実行開始可能)なコルーチンに割り当てられる。
async def say_after(delay, what):
print(f'before_sleep: {what}')
await asyncio.sleep(delay) # --①
print(what)
await
は実行中のコルーチンを一時停止させる式なので、コルーチンの中でしか宣言できない。また式なので、実行結果を =
で変数に代入することができる。
Awaitableオブジェクトはコルーチン、Task
(後述)、Future
の3種があるが、基本的にコルーチンとTask
だけ覚えておけばよい。Future
はTask
の低レベルAPIであるためである。
実際に await
挙動を確認するため、以下のコードの実行を例に実行処理順を解説する。
async def say_after(delay, what):
print(f'before_sleep: {what}')
await asyncio.sleep(delay)
print(what)
async def execute():
print(f'started at {time.strftime("%X")}')
await say_after(1, 'hello')
await say_after(2, 'world')
print(f'ended at {time.strftime("%X")}')
asyncio.run(execute())
started at 17:56:57
before_sleep: hello
hello
before_sleep: world
world
ended at 17:57:00
ポイントは並行処理されていない点にある (1+2秒で3秒掛かってる)。async/await
構文を使っただけでは並行処理されない。await
の挙動により実際の実行処理は以下の図の通りになる。
並行処理がされなかったのは、sleep
中にReadyなコルーチンがなかったためである。基本的にコルーチンはawait
されて初めてReadyになるため、say_after(2, 'world')
は並行して実行されなかった。
asyncioで並行処理を行うためには、await
をせずにコルーチンをReadyな状態にする機能が必要になる。それが次の章で説明するTask
である。
Task
とTask
による並行処理
Task
とはコルーチンをReadyの状態にし、実行状態と結果を管理するものである。Task
はasyncio.create_task(coroutine())
で作成できる。Ready状態にするだけなので、Task
を作っただけではコルーチンは実行されない。どこかのコルーチンが一時停止状態になって計算リソースが開放されて初めて処理が実行される。
実際にTask
の挙動を確認するため、以下のコードの実行を例に実行処理順を解説する。
async def say_after(delay, what):
print(f'before_sleep: {what}')
await asyncio.sleep(delay)
print(what)
async def execute2():
print(f'started at {time.strftime("%X")}')
task = asyncio.create_task(say_after(2, 'world'))
await say_after(1, 'hello')
await task
print(f'ended at {time.strftime("%X")}')
asyncio.run(execute2())
started at 18:51:38
before_sleep: hello
before_sleep: world
hello
world
ended at 18:51:40
ポイントはsay_after()
が並行処理されている点である(先ほどとは違い2秒しかかかってない)。Task
の挙動により実際の実行処理は以下の図の通りになる。
このようにTask
を作成すると、コルーチンをReadyの状態にし、計算リソースが開放されると処理が割り当てられるようになる。ただし、コルーチンのReady状態になる順番通りに計算リソースが割り当てられるわけではなく、イベントループの処理ロジックによって割り当てされるため実行順序には注意されたい(経験上、おおよそReady状態になった順番通りにリソースが割り当てられる)。
asyncioによる並行処理は基本的にこのTask
を利用することになる。並行化したいものはとりあえずTask
を作成し、それをawait
すればよい。
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({number}), currently i={i}...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
return f
async def execute():
task_a = asyncio.create_task(factorial('A', 2))
task_b = asyncio.create_task(factorial('B', 3))
task_c = asyncio.create_task(factorial('C', 4))
L = []
L.append(await task_a)
L.append(await task_b)
L.append(await task_c)
print(L)
asyncio.run(execute())
Task A: Compute factorial(2), currently i=2...
Task B: Compute factorial(3), currently i=2...
Task C: Compute factorial(4), currently i=2...
Task A: factorial(2) = 2
Task B: Compute factorial(3), currently i=3...
Task C: Compute factorial(4), currently i=3...
Task B: factorial(3) = 6
Task C: Compute factorial(4), currently i=4...
Task C: factorial(4) = 24
[2, 6, 24]
ただし、Task
を作ってawait
をするやり方だと、Task
が多くなったときにボイラープレートなコードが増えてしまう(create_task(...), create_task(...)
, await task1, await task2 ...
)。これを解決するやり方としてasyncioは高レベルAPIとしてasyncio.gather()
というAPIを提供している。
asyncio.gather()
を使うと上記コードは以下のように書き換えられる。
async def execute2():
# Schedule three calls *concurrently*:
L = await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
print(L)
多くのTask
を扱う場合はこちらを使うとよい。
asyncioを利用したWebAPIリクエストの並行化
もし多くのWebAPIを並行でリクエストしたい場合は、リクエストAPIがコルーチンで実装されたaiohttp
(→公式ドキュメント)ライブラリを活用するとよい。Pythonでデファクトとして利用される requests
はコルーチンで実装されてないため、並行化ができないためである。実装例は以下の参考文献を参照されたい。
まとめ
- asyncioは
async
とawait
を理解すればさほど難しくない - 大抵のことは公式ドキュメントに載っている
(え?いまさら asyncio とか遅くない?という批評は重々承知してます・・・)
(もし間違っている部分があったら直すのでコメントいただけると幸いです)