asyncio.TaskGroupでの並行処理を実装した際、雑に作ってしまっていたのを整理した際の覚書
シンプルに並行処理
async def sub_task(target_id):
# なんらかの処理
return result
async def main():
target_ids = [idのリスト]
tasks = []
async with asyncio.TaskGroup() as tg:
for _id in target_ids:
tasks.append(tg.create_task(sub_task(_id)))
# この時点で全taskが終わっている
for _task in tasks:
# task 結果を以て何か処理する場合
print(_task.result())
asyncio.run(main())
TargetGroup.create_task() の戻り値には、Taskオブジェクトが入っている
タスクの結果を以て何か処理を行う場合はこのオブジェクトを使用する
-
Task.done()- タスクが処理済みorキャンセル済みなら True
-
Task.calcelled()- タスクがキャンセルされた場合 True
- 並行で走っている別タスクで例外が発生した場合、他のタスクは自動的にキャンセルされる
-
Task.result()- タスクの結果を取得
- 正常に処理された場合はタスクの戻り値
- タスクで例外が発生していた場合、その例外がraiseされる
- タスクがキャンセルされていた場合、CancelledError がraiseされる
- タスクの結果を取得
-
Task.exception()- タスクで例外が発生していた場合、その例外がraiseされる
- タスクがキャンセルされていた場合、CancelledError がraiseされる
- タスクが正常終了していたら None
なので
並行処理の例外対応?
async def sub_task(target_id):
# なんらかの処理
return result
async def main():
target_ids = [idのリスト]
tasks = []
async with asyncio.TaskGroup() as tg:
for _id in target_ids:
tasks.append(tg.create_task(sub_task(_id)))
# この時点で全taskが終わっている
for _task in tasks:
# task 結果を以て何か処理する場合
try:
print(_task.result())
except CancelledError:
# タスクキャンセルされていた場合の処理
except Exception:
# 例外処理
asyncio.run(main())
↑のように書いて十分な気がしてしまうが、
TaskGroupが ExceptionGroupをraiseしてくる事があり、それをキャッチしておかないと L.61部分でハンドリングされていない例外として異常終了してしまう
なので
並行処理の例外対応
async def sub_task(target_id):
# なんらかの処理
return result
async def main():
target_ids = [idのリスト]
tasks = []
# TaskGroup のExceptionGroupを拾う
try:
async with asyncio.TaskGroup() as tg:
for _id in target_ids:
tasks.append(tg.create_task(sub_task(_id)))
except* TerminateTaskGroup:
# 例外処理は後続でやるため、ここはpassでスルー
pass
# この時点で全taskが終わっている
for _task in tasks:
# task 結果を以て何か処理する場合
try:
print(_task.result())
except CancelledError:
# タスクキャンセルされていた場合の処理
except Exception:
# 例外処理
asyncio.run(main())
公式ドキュメントに書いてるんで、ちゃんと読みましょうと。
asyncio.TaskGroup ドキュメント