はじめに
非同期処理を走らせるためにasyncioというモジュールがある。ここでは、その2重平行化の方法について述べる。2重平行化とは、複数個のタスクを平行処理しようとした時に、各タスク内の処理を展開して平行に処理したい場合である。例えば、5個のメインタスクがあり、各メインタスクは3個のサブタスクを持ち、計15個のサブタスクを処理したい場合を考える。この場合に、15個のサブタスクを完全に平行化したい、というのが今回の趣旨である。スクレイピングなんかで便利だと思う。初めて使ったけど、これで合ってるかな???
通常の平行処理
メインタスクを4つ用意し、各タスクは5, 3, 12, 2秒ずつスリープすることとする。コードは以下となる。当然、12秒したら結果が返ってくる。
import asyncio
async def sleep(s):
for _ in range(s):
await asyncio.sleep(1)
return s
if __name__ == '__main__':
loop = asyncio.get_event_loop()
tasks = [sleep(5), sleep(3), sleep(12), sleep(2)]
futures = asyncio.gather(*tasks)
loop.run_until_complete(futures)
ret = futures.result()
print(ret)
loop.close()
2重平行処理
先の5, 3, 12, 2秒を、1秒ずつに分解し、22個のサブタスクを平行で実行してみる。このケースでは、sleep
関数内でサブタスクを作成し、yield from
でタスクの集合を返す。それが終了したら、各結果をマージ(ここではsum
)して返す。
import asyncio
async def sleep_local():
await asyncio.sleep(1)
return 1
def sleep(s):
subtasks = [sleep_local() for _ in range(s)]
futures = asyncio.gather(*subtasks)
yield from futures
ret = futures.result()
return sum(ret)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
tasks = [sleep(5), sleep(3), sleep(12), sleep(2)]
futures = asyncio.gather(*tasks)
loop.run_until_complete(futures)
ret = futures.result()
print(ret)
loop.close()