2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonの非同期IOを学ぶ

Last updated at Posted at 2025-11-30

この記事はZennに投稿した内容をアドベントカレンダー用に転記したものです。

概要

Pythonプログラムの性能を向上させる手段のひとつとして非同期IOに対応させることがあります。
非同期IOはノンブロッキングIOとイベントループでIOバウンドな処理の性能を向上させられるものです。

私がもっとも慣れ親しんだ言語はJavaでして、Pythonは習熟しているとは言い難いです。
今回は非同期IOのためのAPIの中からよく使いそうなものを選び、学んでみようと思います。

非同期IOの基本

コルーチン

asyncを使うことでコルーチン関数を定義できます。

async def foo() -> None:
    ...

コルーチン関数は単に呼び出しただけでは実行されません。
awaitキーワードを付けて実行されるようにします。

# 単に呼び出しただけでは実行されない(警告が出力される)
foo()
# awaitキーワードを付けることで実行される
await foo()

イベントループ

asyncio.run()へ後述するコルーチンを渡すことで、イベントループでコルーチンを実行して結果を返します。

async def main() -> None:
    ...

asyncio.run(main())

プログラム全体を非同期IOへ対応させる場合、main関数をコルーチン関数にしてasyncio.run()へ渡すのが一般的です。

タスクの作成

前述の通りコルーチン関数は単に呼び出しただけでは実行されません。

次のコードはコルーチン関数foobarを並列に実行させようとしていますが、fbはどちらもawaitで実行が開始され完了まで待機されるため、直列に実行されてしまいます。

f = foo()
b = bar()
await f
await b

asyncio.create_task()を使えばawaitキーワードを用いなくても実行が開始されます。
先ほどのコード例は次のように修正することで期待通りに動作します。

f = asyncio.create_task(foo())
b = asyncio.create_task(bar())
await f
await b

キュー

asyncio.Queueはコルーチン間でデータを受け渡しできるキューです。

ここではProducer-Consumerパターンでの利用を前提にしています。

queue = asyncio.Queue[str]()

put()でエンキューします。

await queue.put("foobar")

get()でデキューします。
受け取ったアイテムを処理したらtask_done()を呼び出します。

item = await queue.get()

...

queue.task_done()

asyncio.Queueは内部に未完了タスクのカウンターを持っています。
カウンターはアイテムをエンキューするたびにインクリメントされ、task_done()を呼び出されるたびにデクリメントされます。

未完了タスクのカウンターはjoin()で参照されます。
join()はキュー内のすべてのアイテムが取得され、処理されるまで待機します。

await queue.join()

前述のコード例ではasyncio.Queueの型変数にstrを指定していました。
Poison PillパターンでConsumerへ完了を通知したい場合、Noneとのユニオン型を指定するのが簡単です。

queue = asyncio.Queue[str | None]()

# 通常のアイテム
await queue.put("foobar")

# すべてのアイテムを渡しきってConsumerを終了させたい場合
await queue.put(None)

Consumer側ではNoneを受け取ったら自身を終了させます。

async def consumer(queue: asyncio.Queue[int | None]) -> None:
    while True:
        item = await queue.get()
        try:
            if item is None:
                return

            ...

        finally:
            queue.task_done()

同期プリミティブ

複数個のコルーチンを協調させるためのものです。

用意されているものはいずれも基本的でシンプルなものなので、公式ドキュメントを読めば容易に理解できそうです。

その他

あと覚えておくべきAPIとしてasyncio.gather()があります。
これは複数個のコルーチン/タスクを並列実行して結果のリストを返します。

# 前提: 次のようなコルーチン関数が定義されている
#
# async def coro(id: str) -> int:
#     ...

foo = coro("foo")
bar = coro("bar")
baz = coro("baz")

results = await asyncio.gather(foo, bar, baz)

このコード例は次のコード例と概ね同様に動作します。

# 前提: 次のようなコルーチン関数が定義されている
#
# async def coro(id: str) -> int:
#     ...

foo = coro("foo")
bar = coro("bar")
baz = coro("baz")

foo_task = asyncio.create_task(foo)
bar_task = asyncio.create_task(bar)
baz_task = asyncio.create_task(baz)

results = [
    await foo_task,
    await bar_task,
    await baz_task,
]

ユースケース

ここからはいくつかのユースケースを挙げて、非同期IOに対応させるコード例を示します。

Producer-Consumerパターン

n個のConsumerがキューからアイテムを取り出して並列に処理します。
並列数はConsumerの数で制御します。

次のコード例はデキューしたアイテムをログ出力したあと1秒間待機するConsumerを3並列で動かしています。

import asyncio
import logging

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s [%(name)s] (%(taskName)s) %(message)s"
)

item_size = 50
parallels = 3
wait_seconds = 1


async def consumer(name: str, queue: asyncio.Queue[int | None]) -> None:
    logger = logging.getLogger(name)
    while True:
        item = await queue.get()
        try:
            if item is None:
                return
            logger.info("item = %s", item)
            await asyncio.sleep(wait_seconds)
        finally:
            queue.task_done()


async def main():
    queue = asyncio.Queue[int | None]()

    # 複数個のConsumerを作る
    consumers = [
        asyncio.create_task(consumer(f"consumer-{i + 1}", queue))
        for i in range(parallels)
    ]

    logger = logging.getLogger("producer")
    for i in range(item_size):
        item = i + 1
        logger.info("item = %s", item)
        await queue.put(item)

    # Poison PillパターンのためConsumerの数だけNoneをエンキューする
    for _ in range(parallels):
        logger.info("item = None")
        await queue.put(None)

    await asyncio.gather(*consumers)


if __name__ == "__main__":
    asyncio.run(main())

Producerがすべてのアイテムを一息でエンキューしていますが、キューにサイズを設定することで少しずつエンキューするように制御できます。

queue = asyncio.Queue[int | None](maxsize=10)

大量のタスクを少しずつ処理する

あらかじめアイテムの数が分かっている場合はProducer-Consumerパターンを使わなくても処理できます。

次のコード例は50個のアイテムを処理するタスクを作成して最大3並列で実行します。
同期プリミティブのasyncio.Semaphoreで並列数を制御しています。

import asyncio
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s (%(taskName)s) %(message)s")

task_size = 50
parallels = 3
wait_seconds = 1


async def create_task(semaphore: asyncio.Semaphore, item: int) -> None:
    async with semaphore:
        logging.info("%d", item)
        await asyncio.sleep(wait_seconds)


async def main():
    semaphore = asyncio.Semaphore(parallels)
    tasks = [create_task(semaphore, i) for i in range(task_size)]
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())

まとめ

Pythonの非同期IOを少しは知れました。

注意深くコードを書かないとコルーチンをawaitし忘れて実行されなかったり、並列で実行しているつもりが直列になってしまいそうで、それなりに高難度であるという感想を持ちました。

また、コードを書いてしまって後から非同期IOへ対応しようとすると、プログラム全体にasync/awaitを書いて回らないといけない状況になり得るので、最初から問答無用で非同期IOへ対応しておくのが無難だと思いました。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?