はじめに
スクレイピングやAPIの実行などの非同期処理を並列に実行したいが、並列数を指定したい
実装
from asyncio import Semaphore, gather, sleep
from collections.abc import Awaitable
async def parallel(tasks: list[Awaitable], concurrency: int) -> list:
"""複数の非同期タスクを並列実行数の上限を設けつつ並列実行する"""
assert concurrency > 0
sem = Semaphore(concurrency)
async def exec(task):
async with sem:
return await task
return await gather(*[exec(task) for task in tasks])
コード例
非同期関数であるtest関数を3個並列で処理させる
async def test(i: str) -> int:
print(f"start: {i}")
# 非同期処理
await sleep(2)
print(f"end : {i}")
return i
async def process():
tasks = [test(i) for i in range(10)]
result = await parallel(tasks, concurrency=3)
await process()
出力
start: 0
start: 1
start: 2
end : 0
end : 1
end : 2
start: 3
start: 4
start: 5
end : 3
end : 4
end : 5
start: 6
start: 7
start: 8
end : 6
end : 7
end : 8
start: 9
end : 9