Python の asyncio.Queue の使い方のメモ。
やりたいこと
- queue からアイテムを取得して処理するタスを実行。
- queue が空になった場合にどのようにタスクを終了させるかの検証。
queue.get() の場合
queue.get() で queue からアイテムを取得する場合、queue が空になっても待ち続けます。
queue.empty() で queue が空になったことはわかりますが、複数の task が queue.get() を実行する場合、すでに get() を呼び出した task は queue が空になったことがわかりません。
task.cancel()
task.cancel() を実行することで、queue が空になった際に task を終了させることができます。
import random
import asyncio
async def consumer(name, queue):
cnt = 0
while True:
try:
item = await queue.get()
queue.task_done()
cnt += 1
print(f"{name}: {item}")
await asyncio.sleep(random.uniform(0.1, 0.2))
except asyncio.CancelledError as e:
print(f"cancelled: {name}")
return cnt
except Exception as e:
print(f"exception: {e}")
return None
print(f"consumer({name}): end {cnt}")
return cnt
def tasks_state(tasks, result=False):
for task in tasks:
print(f"*** {task.get_name()} ***")
print(f"done: {task.done()}")
if result:
print(f"result: {task.result()}")
async def main():
# producer
queue = asyncio.Queue()
n = 10
for i in range(n):
await queue.put(f"{i}")
# consumer
tasks = []
num_tasks = 3
for i in range(num_tasks):
name = f"c{i}"
task = asyncio.create_task(consumer(name, queue), name=name)
tasks.append(task)
# queue が空になるまで待ち
tasks_state(tasks)
await queue.join()
print(f"joined")
# task を cancel
for i in range(num_tasks):
tasks[i].cancel()
await tasks[i]
tasks_state(tasks, True)
return 0
asyncio.run(main())
実行例
*** c0 ***
done: False
*** c1 ***
done: False
*** c2 ***
done: False
c0: 0
c1: 1
c2: 2
c2: 3
c0: 4
c1: 5
c0: 6
c1: 7
c2: 8
c0: 9
joined
cancelled: c0
cancelled: c1
cancelled: c2
*** c0 ***
done: True
result: 4
*** c1 ***
done: True
result: 3
*** c2 ***
done: True
result: 3
queue.shutdown()
Python >= 3.13 では Queue.shutdown() で queue を shutdown すると、queue.get() などで asyncio.QueueShutDown が発生します。
QueueShutDown を捕捉することで、get() 待ちから抜けることができます。
import random
import asyncio
async def consumer(name, queue):
cnt = 0
while True:
try:
item = await queue.get()
queue.task_done()
cnt += 1
print(f"{name}: {item}")
await asyncio.sleep(random.uniform(0.1, 0.2))
except asyncio.QueueShutDown as e:
print(f"shutdown: {name}")
return cnt
except Exception as e:
print(f"exception: {e}")
return None
print(f"consumer({name}): end {cnt}")
return cnt
def tasks_state(tasks, result=False):
for task in tasks:
print(f"*** {task.get_name()} ***")
print(f"done: {task.done()}")
if result:
print(f"result: {task.result()}")
async def main():
# producer
queue = asyncio.Queue()
n = 10
for i in range(n):
await queue.put(f"{i}")
# consumer
tasks = []
num_tasks = 3
for i in range(num_tasks):
name = f"c{i}"
task = asyncio.create_task(consumer(name, queue), name=name)
tasks.append(task)
# queue が空になるまで待ち
tasks_state(tasks)
await queue.join()
# queue を shutdown (>= 3.13)
queue.shutdown()
# consumer の終了待ち
for i in range(num_tasks):
await tasks[i]
tasks_state(tasks, True)
return 0
asyncio.run(main())
実行例
*** c0 ***
done: False
*** c1 ***
done: False
*** c2 ***
done: False
c0: 0
c1: 1
c2: 2
c1: 3
c2: 4
c0: 5
c1: 6
c2: 7
c0: 8
c1: 9
shutdown: c2
shutdown: c0
shutdown: c1
*** c0 ***
done: True
result: 3
*** c1 ***
done: True
result: 4
*** c2 ***
done: True
result: 3
queue.get_nowait() を使う場合
queue.get_nowait() は queue からアイテムを取得できる場合はアイテムを返し、空の場合には asyncio.QueueEmpty の例外を発生させます。
mport random
import asyncio
async def consumer(name, queue):
cnt = 0
while True:
try:
item = queue.get_nowait()
queue.task_done()
print(f"{name}: {item}")
cnt += 1
await asyncio.sleep(random.uniform(0.1, 0.2))
except asyncio.QueueEmpty:
break
return cnt
def tasks_state(tasks, result=False):
for task in tasks:
print(f"*** {task.get_name()} ***")
print(f"done: {task.done()}")
if result:
print(f"result: {task.result()}")
async def main():
# queue
queue_len = 10
queue = asyncio.Queue()
for i in range(queue_len):
await queue.put(f"{i}")
# consumer task
num_tasks = 3
tasks = []
for i in range(num_tasks):
name = f"c{i}"
task = asyncio.create_task(consumer(name, queue), name=name)
tasks.append(task)
print("\nbefore join")
tasks_state(tasks)
await queue.join()
print("\nafter join")
tasks_state(tasks)
# 実行結果
results = [await task for task in tasks]
print("\nresults")
tasks_state(tasks, True)
print(results)
return 0
asyncio.run(main())
実行例
before join
*** c0 ***
done: False
*** c1 ***
done: False
*** c2 ***
done: False
c0: 0
c1: 1
c2: 2
c2: 3
c0: 4
c1: 5
c2: 6
c0: 7
c1: 8
c2: 9
after join
*** c0 ***
done: False
*** c1 ***
done: False
*** c2 ***
done: False
results
*** c0 ***
done: True
result: 3
*** c1 ***
done: True
result: 3
*** c2 ***
done: True
result: 4
[3, 3, 4]