Producerも一人で重く、Consumerは重い処理をする場合
Producer側も重い場合はasyncio.run
の隠蔽は無理かな。。。
次のような、Producer側にもI/O待ち等がある場合。
async def my_iterator(n):
for x in range(n):
# produce an item
print('producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
item = str(x)
# put the item in the queue
yield item
次のように実行。非同期でもそうじゃないイテレータでも対応。
print("=== Pattern A ===")
asyncio.run(request_pc(3, print_async, range(10)))
print("=== Pattern B ===")
asyncio.run(request_pc(3, print_async, my_iterator(10)))
にしても、命名力なさすぎ、request producer-consumer で request_pc ぐらいしか
思いつかない。request_pro_con.これも意味わからん。。。
coroutine 内の例外
consumerで例外が発生した場合、consumeの処理が停止するが、await等で処理していないため
例外が保留されてしまうみたい。
そうすると、producerがqueueがはけないので処理が固まってしまうことが判明。
producerが生きている限り、consumerが死んでないか確認する処理も追加。
全文
関数定義
import asyncio
async def request_pc(num_consumer, async_func, iterator):
async def consume(async_func, queue):
while True:
# wait for an item from the producer
await async_func(await queue.get())
# Notify the queue that the item has been processed
queue.task_done()
async def produce(queue, iterator):
if hasattr(iterator, "__aiter__"):
async for item in iterator:
# put the item in the queue
await queue.put(item)
else:
for item in iterator:
# put the item in the queue
await queue.put(item)
# wait until the consumer has processed all items
await queue.join()
async def watchdog(producer, tasks, timeout=1.0):
while not producer.done():
await asyncio.sleep(timeout)
for t in tasks:
if t.done():
raise ValueError("consume is done." + str(t))
queue = asyncio.Queue(num_consumer)
# schedule the consumer
tasks = []
for x in range(num_consumer):
task = asyncio.ensure_future(consume(async_func, queue))
tasks.append(task)
# schedule the producer
producer = asyncio.ensure_future(produce(queue, iterator))
# run the watchdog and wait for completion
await asyncio.gather(producer, watchdog(producer, tasks))
# the consumer is still awaiting for an item, cancel it
for t in tasks:
t.cancel()
使い方
import random
import time
async def my_iterator(n):
for x in range(n):
# produce an item
print('producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
item = str(x)
# put the item in the queue
yield item
async def print_async(item):
st = time.time()
print('consuming {}...'.format(item))
# simulate i/o operation using sleep
sl = random.random()
await asyncio.sleep(sl)
# process the item
print('consuming {}...done({:.3f}, {:.3f})'.format(item, sl, time.time() - st))
print("=== Pattern A ===")
asyncio.run(request_pc(3, print_async, range(10)))
print()
print("=== Pattern B ===")
asyncio.run(request_pc(3, print_async, my_iterator(10)))