0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Python の asyncio.Queue の使い方

Last updated at Posted at 2025-04-13

Python の asyncio.Queue の使い方のメモ。

やりたいこと

  • queue からアイテムを取得して処理するタスを実行。
  • queue が空になった場合にどのようにタスクを終了させるかの検証。

queue.get() の場合

queue.get() で queue からアイテムを取得する場合、queue が空になっても待ち続けます。
queue.empty() で queue が空になったことはわかりますが、複数の task が queue.get() を実行する場合、すでに get() を呼び出した task は queue が空になったことがわかりません。

task.cancel()

task.cancel() を実行することで、queue が空になった際に task を終了させることができます。

import random
import asyncio


async def consumer(name, queue):
    cnt = 0
    while True:
        try:
            item = await queue.get()
            queue.task_done()
            cnt += 1

            print(f"{name}: {item}")

            await asyncio.sleep(random.uniform(0.1, 0.2))

        except asyncio.CancelledError as e:
            print(f"cancelled: {name}")
            return cnt

        except Exception as e:
            print(f"exception: {e}")
            return None

    print(f"consumer({name}): end {cnt}")

    return cnt


def tasks_state(tasks, result=False):
    for task in tasks:
        print(f"*** {task.get_name()} ***")
        print(f"done: {task.done()}")
        if result:
            print(f"result: {task.result()}")


async def main():
    # producer
    queue = asyncio.Queue()
    n = 10
    for i in range(n):
        await queue.put(f"{i}")

    # consumer
    tasks = []
    num_tasks = 3
    for i in range(num_tasks):
        name = f"c{i}"
        task = asyncio.create_task(consumer(name, queue), name=name)
        tasks.append(task)

    # queue が空になるまで待ち
    tasks_state(tasks)
    await queue.join()
    print(f"joined")

    # task を cancel
    for i in range(num_tasks):
        tasks[i].cancel()
        await tasks[i]

    tasks_state(tasks, True)

    return 0


asyncio.run(main())

実行例

*** c0 ***
done: False
*** c1 ***
done: False
*** c2 ***
done: False
c0: 0
c1: 1
c2: 2
c2: 3
c0: 4
c1: 5
c0: 6
c1: 7
c2: 8
c0: 9
joined
cancelled: c0
cancelled: c1
cancelled: c2
*** c0 ***
done: True
result: 4
*** c1 ***
done: True
result: 3
*** c2 ***
done: True
result: 3

queue.shutdown()

Python >= 3.13 では Queue.shutdown() で queue を shutdown すると、queue.get() などで asyncio.QueueShutDown が発生します。
QueueShutDown を捕捉することで、get() 待ちから抜けることができます。

import random
import asyncio


async def consumer(name, queue):
    cnt = 0
    while True:
        try:
            item = await queue.get()
            queue.task_done()
            cnt += 1

            print(f"{name}: {item}")

            await asyncio.sleep(random.uniform(0.1, 0.2))

        except asyncio.QueueShutDown as e:
            print(f"shutdown: {name}")
            return cnt

        except Exception as e:
            print(f"exception: {e}")
            return None

    print(f"consumer({name}): end {cnt}")

    return cnt


def tasks_state(tasks, result=False):
    for task in tasks:
        print(f"*** {task.get_name()} ***")
        print(f"done: {task.done()}")
        if result:
            print(f"result: {task.result()}")


async def main():
    # producer
    queue = asyncio.Queue()
    n = 10
    for i in range(n):
        await queue.put(f"{i}")

    # consumer
    tasks = []
    num_tasks = 3
    for i in range(num_tasks):
        name = f"c{i}"
        task = asyncio.create_task(consumer(name, queue), name=name)
        tasks.append(task)

    # queue が空になるまで待ち
    tasks_state(tasks)
    await queue.join()

    # queue を shutdown (>= 3.13)
    queue.shutdown()

    # consumer の終了待ち
    for i in range(num_tasks):
        await tasks[i]

    tasks_state(tasks, True)

    return 0


asyncio.run(main())

実行例

*** c0 ***
done: False
*** c1 ***
done: False
*** c2 ***
done: False
c0: 0
c1: 1
c2: 2
c1: 3
c2: 4
c0: 5
c1: 6
c2: 7
c0: 8
c1: 9
shutdown: c2
shutdown: c0
shutdown: c1
*** c0 ***
done: True
result: 3
*** c1 ***
done: True
result: 4
*** c2 ***
done: True
result: 3

queue.get_nowait() を使う場合

queue.get_nowait() は queue からアイテムを取得できる場合はアイテムを返し、空の場合には asyncio.QueueEmpty の例外を発生させます。

mport random
import asyncio


async def consumer(name, queue):
    cnt = 0
    while True:
        try:
            item = queue.get_nowait()
            queue.task_done()

            print(f"{name}: {item}")
            cnt += 1

            await asyncio.sleep(random.uniform(0.1, 0.2))

        except asyncio.QueueEmpty:
            break

    return cnt


def tasks_state(tasks, result=False):
    for task in tasks:
        print(f"*** {task.get_name()} ***")
        print(f"done: {task.done()}")
        if result:
            print(f"result: {task.result()}")


async def main():
    # queue
    queue_len = 10
    queue = asyncio.Queue()
    for i in range(queue_len):
        await queue.put(f"{i}")

    # consumer task
    num_tasks = 3
    tasks = []
    for i in range(num_tasks):
        name = f"c{i}"
        task = asyncio.create_task(consumer(name, queue), name=name)
        tasks.append(task)

    print("\nbefore join")
    tasks_state(tasks)

    await queue.join()

    print("\nafter join")
    tasks_state(tasks)

    # 実行結果
    results = [await task for task in tasks]

    print("\nresults")
    tasks_state(tasks, True)
    print(results)

    return 0


asyncio.run(main())

実行例

before join
*** c0 ***
done: False
*** c1 ***
done: False
*** c2 ***
done: False
c0: 0
c1: 1
c2: 2
c2: 3
c0: 4
c1: 5
c2: 6
c0: 7
c1: 8
c2: 9

after join
*** c0 ***
done: False
*** c1 ***
done: False
*** c2 ***
done: False

results
*** c0 ***
done: True
result: 3
*** c1 ***
done: True
result: 3
*** c2 ***
done: True
result: 4
[3, 3, 4]
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?