Producerが一人で軽く、Consumerは重い処理をする場合
Popen で外部プログラムの実行順序をスケジュールしようと思ったけど、
asyncio.create_subprocess_exec があるのを知ったので、
python の async/awaitの勉強してみた。
関数のリターン値が、コルーチン, Task, Futureどれかわからなくなったり、
run_until_complete, call_soon, ensure_future どう使い分けるのがわからなくなったり、
色々気持ち悪いけど、使うだけなら簡単かな。
asyncio.run の隠蔽
asyncio.run
を隠蔽してみたかったので、asyncio_pool
を作ってみた。
使い方
with asyncio_pool(3, print_async) as pool:
for x in range(n):
print('producing {}/{}'.format(x, n))
pool.put(x)
どんどんput
すると、3
までprint_async
を並行処理してくれる。
print_async
の部分を、asyncio.create_subprocess_exec
にすると外部プログラムを並列処理できる。
制限
with asyncio_pool(3, print_async) as pool:
for x in range(n):
time.sleep(random.random())
print('producing {}/{}'.format(x, n))
pool.put(item=x)
これみたいに、put
するまで遅いと、put
の中でprint_async
等の処理をしているので、よろしくない。
全文
import asyncio
import random
import time
def asyncio_pool(num_worker, async_func, *, debug=None):
class Pool:
def __init__(self, num_worker, async_func, *, debug=None):
self.debug = debug
self.num_worker = num_worker
self.async_func = async_func
async def _worker(self):
while True:
# wait for an item from the producer
args, kwargs = await self.queue.get()
await self.async_func(*args, **kwargs)
# Notify the queue that the item has been processed
self.queue.task_done()
def __enter__(self):
if asyncio._get_running_loop() is not None:
raise RuntimeError(
"asyncio_pool() cannot be called from a running event loop")
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
if self.debug is not None:
self.loop.set_debug(self.debug)
self.queue = asyncio.Queue(self.num_worker)
# schedule the worker
self.tasks = []
for x in range(self.num_worker):
task = asyncio.ensure_future(self._worker(), loop=self.loop)
self.tasks.append(task)
return self
def put(self, *args, **kwargs):
self.loop.run_until_complete(self.queue.put((args, kwargs)))
def __exit__(self, exc_type, exc_value, traceback):
# wait until the consumer has processed all items
self.loop.run_until_complete(self.queue.join())
# the consumer is still awaiting for an item, cancel it
for t in self.tasks:
t.cancel()
self.loop.run_until_complete(
asyncio.gather(*self.tasks, loop=self.loop, return_exceptions=True))
try:
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
finally:
asyncio.set_event_loop(None)
self.loop.close()
return False
return Pool(num_worker, async_func, debug=None)
async def print_async(item):
st = time.time()
print('consuming {}...'.format(item))
# simulate i/o operation using sleep
sl = random.random()
await asyncio.sleep(sl)
# process the item
print('consuming {}...done({:.3f}, {:.3f})'.format(item, sl, time.time() - st))
print("==== Pattern A ====")
n = 10
with asyncio_pool(3, print_async) as pool:
for x in range(n):
print('producing {}/{}'.format(x, n))
pool.put(x)
print()
print("==== Pattern B ====")
with asyncio_pool(3, print_async) as pool:
for x in range(n):
time.sleep(random.random())
print('producing {}/{}'.format(x, n))
pool.put(item=x)