0
1

More than 3 years have passed since last update.

asyncioの勉強(Producer/Consumer, その1)

Posted at

Producerが一人で軽く、Consumerは重い処理をする場合

Popen で外部プログラムの実行順序をスケジュールしようと思ったけど、
asyncio.create_subprocess_exec があるのを知ったので、
python の async/awaitの勉強してみた。

関数のリターン値が、コルーチン, Task, Futureどれかわからなくなったり、
run_until_complete, call_soon, ensure_future どう使い分けるのがわからなくなったり、
色々気持ち悪いけど、使うだけなら簡単かな。

asyncio.run の隠蔽

asyncio.run を隠蔽してみたかったので、asyncio_poolを作ってみた。

使い方

with asyncio_pool(3, print_async) as pool:
    for x in range(n):
        print('producing {}/{}'.format(x, n))
        pool.put(x)

どんどんputすると、3までprint_asyncを並行処理してくれる。
print_asyncの部分を、asyncio.create_subprocess_execにすると外部プログラムを並列処理できる。

制限

with asyncio_pool(3, print_async) as pool:
    for x in range(n):
        time.sleep(random.random())
        print('producing {}/{}'.format(x, n))
        pool.put(item=x)

これみたいに、putするまで遅いと、putの中でprint_async等の処理をしているので、よろしくない。

全文

import asyncio
import random
import time

def asyncio_pool(num_worker, async_func, *, debug=None):
    class Pool:
        def __init__(self, num_worker, async_func, *, debug=None):
            self.debug = debug
            self.num_worker = num_worker
            self.async_func = async_func

        async def _worker(self):
            while True:
                # wait for an item from the producer
                args, kwargs = await self.queue.get()
                await self.async_func(*args, **kwargs)
                # Notify the queue that the item has been processed
                self.queue.task_done()

        def __enter__(self):
            if asyncio._get_running_loop() is not None:
                raise RuntimeError(
                    "asyncio_pool() cannot be called from a running event loop")

            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)
            if self.debug is not None:
                self.loop.set_debug(self.debug)

            self.queue = asyncio.Queue(self.num_worker)

            # schedule the worker
            self.tasks = []
            for x in range(self.num_worker):
                task = asyncio.ensure_future(self._worker(), loop=self.loop)
                self.tasks.append(task)
            return self

        def put(self, *args, **kwargs):
            self.loop.run_until_complete(self.queue.put((args, kwargs)))

        def __exit__(self, exc_type, exc_value, traceback):
            # wait until the consumer has processed all items
            self.loop.run_until_complete(self.queue.join())

            # the consumer is still awaiting for an item, cancel it
            for t in self.tasks:
                t.cancel()

            self.loop.run_until_complete(
                asyncio.gather(*self.tasks, loop=self.loop, return_exceptions=True))
            try:
                self.loop.run_until_complete(self.loop.shutdown_asyncgens())
            finally:
                asyncio.set_event_loop(None)
                self.loop.close()
            return False

    return Pool(num_worker, async_func, debug=None)



async def print_async(item):
    st = time.time()
    print('consuming {}...'.format(item))
    # simulate i/o operation using sleep
    sl = random.random()
    await asyncio.sleep(sl)
    # process the item
    print('consuming {}...done({:.3f}, {:.3f})'.format(item, sl, time.time() - st))


print("==== Pattern A ====")
n = 10
with asyncio_pool(3, print_async) as pool:
    for x in range(n):
        print('producing {}/{}'.format(x, n))
        pool.put(x)

print()
print("==== Pattern B ====")

with asyncio_pool(3, print_async) as pool:
    for x in range(n):
        time.sleep(random.random())
        print('producing {}/{}'.format(x, n))
        pool.put(item=x)


0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1