この記事はZennに投稿した内容をアドベントカレンダー用に転記したものです。
概要
Pythonプログラムの性能を向上させる手段のひとつとして非同期IOに対応させることがあります。
非同期IOはノンブロッキングIOとイベントループでIOバウンドな処理の性能を向上させられるものです。
私がもっとも慣れ親しんだ言語はJavaでして、Pythonは習熟しているとは言い難いです。
今回は非同期IOのためのAPIの中からよく使いそうなものを選び、学んでみようと思います。
非同期IOの基本
コルーチン
asyncを使うことでコルーチン関数を定義できます。
async def foo() -> None:
...
コルーチン関数は単に呼び出しただけでは実行されません。
awaitキーワードを付けて実行されるようにします。
# 単に呼び出しただけでは実行されない(警告が出力される)
foo()
# awaitキーワードを付けることで実行される
await foo()
イベントループ
asyncio.run()へ後述するコルーチンを渡すことで、イベントループでコルーチンを実行して結果を返します。
async def main() -> None:
...
asyncio.run(main())
プログラム全体を非同期IOへ対応させる場合、main関数をコルーチン関数にしてasyncio.run()へ渡すのが一般的です。
タスクの作成
前述の通りコルーチン関数は単に呼び出しただけでは実行されません。
次のコードはコルーチン関数fooとbarを並列に実行させようとしていますが、fとbはどちらもawaitで実行が開始され完了まで待機されるため、直列に実行されてしまいます。
f = foo()
b = bar()
await f
await b
asyncio.create_task()を使えばawaitキーワードを用いなくても実行が開始されます。
先ほどのコード例は次のように修正することで期待通りに動作します。
f = asyncio.create_task(foo())
b = asyncio.create_task(bar())
await f
await b
キュー
asyncio.Queueはコルーチン間でデータを受け渡しできるキューです。
ここではProducer-Consumerパターンでの利用を前提にしています。
queue = asyncio.Queue[str]()
put()でエンキューします。
await queue.put("foobar")
get()でデキューします。
受け取ったアイテムを処理したらtask_done()を呼び出します。
item = await queue.get()
...
queue.task_done()
asyncio.Queueは内部に未完了タスクのカウンターを持っています。
カウンターはアイテムをエンキューするたびにインクリメントされ、task_done()を呼び出されるたびにデクリメントされます。
未完了タスクのカウンターはjoin()で参照されます。
join()はキュー内のすべてのアイテムが取得され、処理されるまで待機します。
await queue.join()
前述のコード例ではasyncio.Queueの型変数にstrを指定していました。
Poison PillパターンでConsumerへ完了を通知したい場合、Noneとのユニオン型を指定するのが簡単です。
queue = asyncio.Queue[str | None]()
# 通常のアイテム
await queue.put("foobar")
# すべてのアイテムを渡しきってConsumerを終了させたい場合
await queue.put(None)
Consumer側ではNoneを受け取ったら自身を終了させます。
async def consumer(queue: asyncio.Queue[int | None]) -> None:
while True:
item = await queue.get()
try:
if item is None:
return
...
finally:
queue.task_done()
同期プリミティブ
複数個のコルーチンを協調させるためのものです。
用意されているものはいずれも基本的でシンプルなものなので、公式ドキュメントを読めば容易に理解できそうです。
その他
あと覚えておくべきAPIとしてasyncio.gather()があります。
これは複数個のコルーチン/タスクを並列実行して結果のリストを返します。
# 前提: 次のようなコルーチン関数が定義されている
#
# async def coro(id: str) -> int:
# ...
foo = coro("foo")
bar = coro("bar")
baz = coro("baz")
results = await asyncio.gather(foo, bar, baz)
このコード例は次のコード例と概ね同様に動作します。
# 前提: 次のようなコルーチン関数が定義されている
#
# async def coro(id: str) -> int:
# ...
foo = coro("foo")
bar = coro("bar")
baz = coro("baz")
foo_task = asyncio.create_task(foo)
bar_task = asyncio.create_task(bar)
baz_task = asyncio.create_task(baz)
results = [
await foo_task,
await bar_task,
await baz_task,
]
ユースケース
ここからはいくつかのユースケースを挙げて、非同期IOに対応させるコード例を示します。
Producer-Consumerパターン
n個のConsumerがキューからアイテムを取り出して並列に処理します。
並列数はConsumerの数で制御します。
次のコード例はデキューしたアイテムをログ出力したあと1秒間待機するConsumerを3並列で動かしています。
import asyncio
import logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(name)s] (%(taskName)s) %(message)s"
)
item_size = 50
parallels = 3
wait_seconds = 1
async def consumer(name: str, queue: asyncio.Queue[int | None]) -> None:
logger = logging.getLogger(name)
while True:
item = await queue.get()
try:
if item is None:
return
logger.info("item = %s", item)
await asyncio.sleep(wait_seconds)
finally:
queue.task_done()
async def main():
queue = asyncio.Queue[int | None]()
# 複数個のConsumerを作る
consumers = [
asyncio.create_task(consumer(f"consumer-{i + 1}", queue))
for i in range(parallels)
]
logger = logging.getLogger("producer")
for i in range(item_size):
item = i + 1
logger.info("item = %s", item)
await queue.put(item)
# Poison PillパターンのためConsumerの数だけNoneをエンキューする
for _ in range(parallels):
logger.info("item = None")
await queue.put(None)
await asyncio.gather(*consumers)
if __name__ == "__main__":
asyncio.run(main())
Producerがすべてのアイテムを一息でエンキューしていますが、キューにサイズを設定することで少しずつエンキューするように制御できます。
queue = asyncio.Queue[int | None](maxsize=10)
大量のタスクを少しずつ処理する
あらかじめアイテムの数が分かっている場合はProducer-Consumerパターンを使わなくても処理できます。
次のコード例は50個のアイテムを処理するタスクを作成して最大3並列で実行します。
同期プリミティブのasyncio.Semaphoreで並列数を制御しています。
import asyncio
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s (%(taskName)s) %(message)s")
task_size = 50
parallels = 3
wait_seconds = 1
async def create_task(semaphore: asyncio.Semaphore, item: int) -> None:
async with semaphore:
logging.info("%d", item)
await asyncio.sleep(wait_seconds)
async def main():
semaphore = asyncio.Semaphore(parallels)
tasks = [create_task(semaphore, i) for i in range(task_size)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
まとめ
Pythonの非同期IOを少しは知れました。
注意深くコードを書かないとコルーチンをawaitし忘れて実行されなかったり、並列で実行しているつもりが直列になってしまいそうで、それなりに高難度であるという感想を持ちました。
また、コードを書いてしまって後から非同期IOへ対応しようとすると、プログラム全体にasync/awaitを書いて回らないといけない状況になり得るので、最初から問答無用で非同期IOへ対応しておくのが無難だと思いました。