13
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

asyncio.TaskGroup での並行処理時のエラーハンドリング

13
Posted at

asyncio.TaskGroupでの並行処理を実装した際、雑に作ってしまっていたのを整理した際の覚書

シンプルに並行処理


async def sub_task(target_id):
    # なんらかの処理
    return result

async def main():
    target_ids = [idのリスト]
    tasks = []
    
    async with asyncio.TaskGroup() as tg:
        for _id in target_ids:
            tasks.append(tg.create_task(sub_task(_id)))

    # この時点で全taskが終わっている

    for _task in tasks:
        # task 結果を以て何か処理する場合
        print(_task.result())

asyncio.run(main())

TargetGroup.create_task() の戻り値には、Taskオブジェクトが入っている
タスクの結果を以て何か処理を行う場合はこのオブジェクトを使用する

  • Task.done()
    • タスクが処理済みorキャンセル済みなら True
  • Task.calcelled()
    • タスクがキャンセルされた場合 True
    • 並行で走っている別タスクで例外が発生した場合、他のタスクは自動的にキャンセルされる
  • Task.result()
    • タスクの結果を取得
      • 正常に処理された場合はタスクの戻り値
      • タスクで例外が発生していた場合、その例外がraiseされる
      • タスクがキャンセルされていた場合、CancelledError がraiseされる
  • Task.exception()
    • タスクで例外が発生していた場合、その例外がraiseされる
    • タスクがキャンセルされていた場合、CancelledError がraiseされる
    • タスクが正常終了していたら None

なので

並行処理の例外対応?


async def sub_task(target_id):
    # なんらかの処理
    return result

async def main():
    target_ids = [idのリスト]
    tasks = []
    
    async with asyncio.TaskGroup() as tg:
        for _id in target_ids:
            tasks.append(tg.create_task(sub_task(_id)))

    # この時点で全taskが終わっている

    for _task in tasks:
        # task 結果を以て何か処理する場合
        try:
            print(_task.result())
        except CancelledError:
            # タスクキャンセルされていた場合の処理
        except Exception:
            # 例外処理

asyncio.run(main())

↑のように書いて十分な気がしてしまうが、
TaskGroupが ExceptionGroupをraiseしてくる事があり、それをキャッチしておかないと L.61部分でハンドリングされていない例外として異常終了してしまう
なので

並行処理の例外対応


async def sub_task(target_id):
    # なんらかの処理
    return result

async def main():
    target_ids = [idのリスト]
    tasks = []

    # TaskGroup のExceptionGroupを拾う
    try:
        async with asyncio.TaskGroup() as tg:
            for _id in target_ids:
                tasks.append(tg.create_task(sub_task(_id)))
    except* TerminateTaskGroup:
        # 例外処理は後続でやるため、ここはpassでスルー
        pass

    # この時点で全taskが終わっている

    for _task in tasks:
        # task 結果を以て何か処理する場合
        try:
            print(_task.result())
        except CancelledError:
            # タスクキャンセルされていた場合の処理
        except Exception:
            # 例外処理

asyncio.run(main())

公式ドキュメントに書いてるんで、ちゃんと読みましょうと。

asyncio.TaskGroup ドキュメント

13
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?