0
0

More than 3 years have passed since last update.

asyncioの勉強(Producer/Consumer, その2)

Posted at

Producerも一人で重く、Consumerは重い処理をする場合

Producer側も重い場合はasyncio.runの隠蔽は無理かな。。。
次のような、Producer側にもI/O待ち等がある場合。

async def my_iterator(n):
    for x in range(n):
        # produce an item
        print('producing {}/{}'.format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        yield item

次のように実行。非同期でもそうじゃないイテレータでも対応。

print("=== Pattern A ===")
asyncio.run(request_pc(3, print_async, range(10)))

print("=== Pattern B ===")
asyncio.run(request_pc(3, print_async, my_iterator(10)))

にしても、命名力なさすぎ、request producer-consumer で request_pc ぐらいしか
思いつかない。request_pro_con.これも意味わからん。。。

coroutine 内の例外

consumerで例外が発生した場合、consumeの処理が停止するが、await等で処理していないため
例外が保留されてしまうみたい。
そうすると、producerがqueueがはけないので処理が固まってしまうことが判明。
producerが生きている限り、consumerが死んでないか確認する処理も追加。

全文

関数定義

import asyncio

async def request_pc(num_consumer, async_func, iterator):
    async def consume(async_func, queue):
        while True:
            # wait for an item from the producer
            await async_func(await queue.get())
            # Notify the queue that the item has been processed
            queue.task_done()

    async def produce(queue, iterator):
        if hasattr(iterator, "__aiter__"):
            async for item in iterator:
                # put the item in the queue
                await queue.put(item)
        else:
            for item in iterator:
                # put the item in the queue
                await queue.put(item)

        # wait until the consumer has processed all items
        await queue.join()

    async def watchdog(producer, tasks, timeout=1.0):
        while not producer.done():
            await asyncio.sleep(timeout)
            for t in tasks:
                if t.done():
                    raise ValueError("consume is done." + str(t))

    queue = asyncio.Queue(num_consumer)
    # schedule the consumer
    tasks = []
    for x in range(num_consumer):
        task = asyncio.ensure_future(consume(async_func, queue))
        tasks.append(task)

    # schedule the producer
    producer = asyncio.ensure_future(produce(queue, iterator))

    # run the watchdog and wait for completion
    await asyncio.gather(producer, watchdog(producer, tasks))

    # the consumer is still awaiting for an item, cancel it
    for t in tasks:
        t.cancel()

使い方

import random
import time

async def my_iterator(n):
    for x in range(n):
        # produce an item
        print('producing {}/{}'.format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        yield item

async def print_async(item):
    st = time.time()
    print('consuming {}...'.format(item))
    # simulate i/o operation using sleep
    sl = random.random()
    await asyncio.sleep(sl)
    # process the item
    print('consuming {}...done({:.3f}, {:.3f})'.format(item, sl, time.time() - st))

print("=== Pattern A ===")
asyncio.run(request_pc(3, print_async, range(10)))

print()
print("=== Pattern B ===")
asyncio.run(request_pc(3, print_async, my_iterator(10)))
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0