39
21

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

[Swift] Task Groups の基礎

Posted at

概要

Task Groups は複数のタスクを並列に実行するための仕組みです。この記事では Task Groups の基本的な使い方から、キャンセル周りも含めた細かい振る舞いまで理解していきます。

記事中の動作検証は Xcode 14 Beta 5 で行っています。

TL;DR

  • Task Groups は子タスクを並列に実行したいときに使う。とくに、同じ型を返す動的な数の子タスクの実行に適している
  • Task Group は withTaskGroupwithThrowingTaskGroup で生成できる。子タスクがエラーを投げない場合は前者、投げる場合は後者を使う
  • 基本的な流れとして with(Throwing)TaskGroup に渡すクロージャの中で子タスクを生成する処理とそれを受け取る処理を書く
    • 子タスクの生成は addTask(UnlessCancelled) で行う。典型的には for 文で Collection の要素を回してそれをもとにタスクを生成することが多い
    • 受け取り処理は for try await ... in で行うことが多いが、 TaskGroupAsyncSequence であることを利用して柔軟に処理が書ける
  • Task Groups は処理を抜ける際に明示的に await されていない子タスクが残っていたら await する。 async let はスコープを抜ける際に await されていない子タスクをキャンセルするので振る舞いが異なる
  • Task Group は渡されたクロージャがエラーを投げたときや cancelAll が呼ばれたときに、残っている子タスクをすべてキャンセルする
    • 個々の子タスクがエラーを投げても、その子タスクが結果を返さないだけでキャンセルは発生しないことに注意

Task Groups の使いどころ

まず、 Task Groups がどういう場面で有用なのかを見ていきます。Todo アプリで Done になっていない Todo を表示することを考えます。 API から Todo の情報を取得しますが、以下のように

  • Done になっていない Todo の ID 一覧を取得する API
  • ID から Todo の詳細を取得する API

の2つがあるとします。

enum TodoAPI {
    static func getUndoneTodoIDs() async throws -> [String] { /* ... */ }
    static func getTodoDetail(id: String) async throws -> Todo { /* ... */ }
}

API の構成を考えると、まず Todo の ID 一覧を getUndoneTodoIDs で取得し、そのレスポンスの中に含まれる ID それぞれに対して getTodoDetail を呼び、返ってきた Todo をまとめる必要があることがわかります。以下のようなコードになるでしょう。

func getUndoneTodos() async throws -> [Todo] {
    let todoIDs = try await TodoAPI.getUndoneTodoIDs()
        
    var todos: [Todo] = []
    for id in todoIDs {
        let todo = try await TodoAPI.getTodoDetail(id: id)
        todos.append(todo)
    }
    return todos
}

このコードには明らかな問題があります。 getTodoDetail を1つ目のリクエストが完了してから2つ目のリクエストを実行するというように直列に呼んでいるため getUndoneTodoIDs で返ってくる ID の数が増えれば増えるほど完了に時間がかかるということです。仮に getTodoDetail のレスポンスが1秒程度で getUndoneTodoIDs から ID が 10 個返ってくると getUndoneTodos 全体の実行が完了するまでに 10 秒以上はかかることになります。

2つ目のリクエストが1つ目のリクエストの結果が返ってくるのを待っている必要はないので、 getTodoDetail を ID の個数だけ並列に呼び出すことでこの問題は解決します。このようなときに役立つのが Task Groups です。細かい部分についてはこの記事の後程見ていきますが、大まかな使い方は以下のようになります。

func getUndoneTodos() async throws -> [Todo] {
    let todoIDs = try await TodoAPI.getUndoneTodoIDs()
        
    // 1
    return try await withThrowingTaskGroup(of: Todo.self, returning: [Todo].self) { group in
        // 2
        for id in todoIDs {
            group.addTask {
                try await TodoAPI.getTodoDetail(id: id)
            }
        }
        
        // 3
        var todos: [Todo] = []
        for try await todo in group {
            todos.append(todo)
        }
        return todos
    }
}

まず、 1withThrowingTaskGroup を呼び出して Task Group を生成しています。3つ目の引数のクロージャに実際に行いたい処理を書きますが、そのクロージャの引数に TaskGroup のインスタンスが渡ってきます。 2 では for 文で子タスクを起動しています。 Task Groups を使わない例では for 文の中で await していたので1つ目の Todo の取得が終わってからでないと2つ目の Todo の取得を開始することができませんでしたが、 TaskGroupaddTask はタスクを生成するだけブロックしないので、結果待たずにすべての Todo の取得を並行に開始することができます。最後に 3TaskGroup に対して for try await ... in することで返ってきた結果を得ることができます。

以上のように並行に Todo の取得を行うことで、 getTodoDetail のレスポンスが1秒で完了するとき、最もうまくいく状況であれば ID の数が何個であってもおおよそ1秒ですべての Todo の取得が完了します。
注意点として、 for try await ... in に渡ってくる Todo の順序は子タスクの生成順ではなく完了順になります。そのため、 todoIDs の順番と getUndoneTodos の返り値の Todo の順番は一般に異なります。もしこれらの順番を揃えたい場合は自分でコードを書いて保証する必要があります。
また、Task Groups には並列実行数の制限がありません。極端な例だと getUndoneTodoIDs が 1000 個の id を返してきたら 1000 個の getTodoDetail が並行に走ってしまうことになります。並列実行数を一定以下に抑えたい場合もその処理を自分で書く必要があります。

Task Groups の利用場面としては、今回の例のように

  • 実行時に決まる個数の処理を並行に行いたい
  • それぞれの処理の結果の型がすべて同じ

という場合が典型的です 1。このような場合に並行に行いたい処理を for 文で addTask で立ち上げ、 for try await ... in で結果を受け取るという使い方をします。

逆に、並列に行いたい処理の個数がコンパイル時に決まっていたり、それらの処理の結果の型が異なる場合は、Swift Concurrency において子タスクを生成するもう一つの方法である async let を使うことが多いです。この記事では深入りしませんが、例えば Todo に画像を添付することができ、 Todo と画像を別の API エンドポイントから取得する必要がある場合を考えると、以下のように async let が使えます。これにより Todo と画像を並行して取得することができます。

enum TodoAPI {
    static func getTodoDetail(id: String) async throws -> Todo { /* ... */ }
    static func getTodoImage(id: String) async throws -> UIImage? { /* ... */ }
}

func getTodoWithImage(id: String) async throws -> (Todo, UIImage?) {
    async let todo = TodoAPI.getTodoDetail(id: id)
    async let image = TodoAPI.getTodoImage(id: id)
    return try await (todo, image)
}

Task Groups の使い方

Task Groups の概要がわかったところで、使い方をもう少し詳しく見ていきます。

2つの生成メソッド

前の項では Task Group の生成に withThrowingTaskGroup を使いましたが、もう一つ withTaskGroup という関数もあります。それぞれのシグネチャを見ておきます。

func withTaskGroup<ChildTaskResult, GroupResult>(
    of childTaskResultType: ChildTaskResult.Type,
    returning returnType: GroupResult.Type = GroupResult.self,
    body: (inout TaskGroup<ChildTaskResult>) async -> GroupResult
) async -> GroupResult where ChildTaskResult : Sendable

func withThrowingTaskGroup<ChildTaskResult, GroupResult>(
    of childTaskResultType: ChildTaskResult.Type,
    returning returnType: GroupResult.Type = GroupResult.self,
    body: (inout ThrowingTaskGroup<ChildTaskResult, Error>) async throws -> GroupResult
) async rethrows -> GroupResult where ChildTaskResult : Sendable

この2つの関数の違いは、 withTaskGroup の方はエラーを投げないのに対して withThrowingTaskGroup の方では主な処理を書く bodythrows 、関数全体も rethrows になっているところです。ちょっと複雑に思えますが、並列に実行したい子タスクがエラーを投げるならば withThrowingTaskGroup 、投げないならば withTaskGroup を使うと覚えておけば大丈夫だと思います。

withTaskGroup の使い方として、エラーを投げない geenerateRandomNumber という関数を並行に呼んで乱数の列を取得する例を考えます。

func generateRandomNumber() async -> Int { /* ... */ }

func generateRandomNumbers(count: Int) async -> [Int] {
    await withTaskGroup(of: Int.self, returning: [Int].self) { group in
        for _ in 0..<count {
            group.addTask {
                await generateRandomNumber()
            }
        }
        
        var randomNumbers: [Int] = []
        for await number in group {
            randomNumbers.append(number)
        }
        return randomNumbers
    }
}

結果をまとめるところで for try await ... in ではなく for await ... in を使っているところや、 withTaskGroup 自体の呼び出しに try が必要ないところが withThrowingTaskGroup と違いますが、逆にそれ以外の部分については同じです。 Swift Concurrency はエラーを使ってキャンセルの仕組みを実現しているので、プロダクションコードを書く上では withThrowingTaskGroup を使うケースの方が多いと思います。

以下、 withTaskGroupwithThrowingTaskGroup に共通する話においては2つをまとめて with(Throwing)TaskGroup と書くことにします。

with(Throwing)TaskGroup の引数について見ていきます。引数は3つ取りますが、 3つ目の body(Throwing)TaskGroup を受け取って子タスクの生成と結果の加工を行うクロージャです。 1つ目の of には子タスクの結果の型を、2つ目の returning には body の返り値の型、すなわち with(Throwing)TaskGroup 自体の返り値の型を渡します。前項の Todo アプリの例では、子タスクは TodobodyTodo を詰めた Array を返していたので of には Todo.selfreturning には [Todo].self を渡していました。このように returningofCollection になるケースもありますが、必ずしもそうなるわけではありません。例えば、 Done になっていない Todo のタイトルをカンマで区切って繋げた文字列がほしいとすると、以下のように returning には String.self を指定することになります。

func getUndoneTodosTitleString() async throws -> String {
    let todoIDs = try await TodoAPI.getUndoneTodoIDs()
        
    return try await withThrowingTaskGroup(of: Todo.self, returning: String.self) { group in
        for id in todoIDs {
            group.addTask {
                try await TodoAPI.getTodoDetail(id: id)
            }
        }
        
        var todoTitles: [String] = []
        for try await todo in group {
            todoTitles.append(todo.title)
        }
        return todoTitles.joined(separator: ",")
    }
}

ここまでは returning を明示的に書いてきましたが、シグネチャを見ると分かるように returning にはデフォルト引数が設定されており、これが bodywith(Throwing)TaskGroup の返り値と同じ型になっています。型推論がうまく効いていれば書く必要はないので、基本的には自分で明示的に書くケースの方が少ないでしょう。

func getUndoneTodosTitleString() async throws -> String {
    let todoIDs = try await TodoAPI.getUndoneTodoIDs()
        
    // ✅ returning を省略してもコンパイルが通る
    return try await withThrowingTaskGroup(of: Todo.self) { group in
		// ...
    }
}

続いて、 with(Throwing)TaskGroup に渡す主な処理である body の書き方について詳しく見ていきます。Todo アプリの例もそうでしたが、 body の処理は基本的に

  1. 並行に実行する子タスクを生成する
  2. 子タスクの結果を受け取り加工して返す

の2つの段階に分けられるので、それぞれについて整理します。

子タスクの生成

子タスクの生成は TaskGroup インスタンスの addTask メソッドで行います。 Task Groups の使い道として、何らかの Collection の要素に対してそれぞれ子タスクを生成するというケースがよくあるので、 Todo アプリの例のように for 文の中で addTask することになる場合が多いでしょう。もちろん、以下のように for 文の中でなくとも addTask することはできます。

func getTwoTodos() async throws -> [Todo] {
    try await withThrowingTaskGroup(of: Todo.self, returning: [Todo].self) { group in
        group.addTask {
            try await TodoAPI.getTodoDetail(id: "id1")
        }
        group.addTask {
            try await TodoAPI.getTodoDetail(id: "id2")
        }
        
        var todos: [Todo] = []
        for try await todo in group {
            todos.append(todo)
        }
        return todos
    }
}

ここまでは省略していましたが、 addTask のシグネチャを見ると Task.initTask.detached と同じように priority が渡せることがわかります。

mutating func addTask(
    priority: TaskPriority? = nil,
    operation: @escaping () async -> ChildTaskResult
)

priority を明示的に渡す場合は以下のようになります。

// ...
group.addTask(priority: .background) {
    try await TodoAPI.getTodoDetail(id: "id1")
}
// ...

priority を省略した場合は Task Group を生成した Taskpriority が使われるため、とくに事情がない限りは省略しておけばよいと思います。

addTask と似たメソッドに addTaskUnlessCancelled があります。

mutating func addTaskUnlessCancelled(
    priority: TaskPriority? = nil,
    operation: @escaping () async -> ChildTaskResult
) -> Bool

addTask との違いは

  • すでに Task Group がキャンセルされている場合は子タスクを生成しない
  • 子タスクを生成したかどうかを Bool で返す

ということです。キャンセルされている場合に子タスクの処理を開始すらしたくないという場合は addTaskUnlessCancelled を使うことになりますが、そのような状況はあまりないように思います。子タスクが適切にキャンセルをハンドリングするように実装されていれば、すでに Task Group がキャンセルした場合に addTask で子タスクが開始されてしまってもすぐキャンセルに対応して処理を抜け、無駄にリソースを使うことにはならないためです。ただ、もし子タスクがキャンセルに対応していない場合や Task Group がキャンセルされるまで子タスクを追加し続けたいという場合などは addTaskUnlessCancelled が使えると思うので、存在を覚えておいてもよいでしょう。

子タスクの結果の受け取り

Todo アプリの例では addTask で起動した子タスクの結果の受け取りを for try await ... in で行っていましたが、これができるのは TaskGroupAsyncSequence に準拠しているためです。 AsyncSequence の使い方について詳しくはドキュメントを参照ください。

TaskGroupAsyncSequence であることを利用すると、 for try await ... in 以外の結果の受け取り方もできます。例えば、 Todo アプリの例では子タスクが返してきた Todo を事前に作っておいた Arrayappend していましたが、 AsyncSequencereduce メソッドを使えば同じことがより簡潔に書けます。

func getUndoneTodos() async throws -> [Todo] {
    let todoIDs = try await TodoAPI.getUndoneTodoIDs()
        
    return try await withThrowingTaskGroup(of: Todo.self, returning: [Todo].self) { group in
        for id in todoIDs {
            group.addTask {
                try await TodoAPI.getTodoDetail(id: id)
            }
        }

-       var todos: [Todo] = []
-       for try await todo in group {
-           todos.append(todo)
-       }
-       return todos
+       return try await group.reduce(into: []) { result, todo in result.append(todo) }
    }
}

ここではこれ以上の例は挙げませんが、他にも TaskGroup インスタンスに対して mapfilter など AsyncSequence に対するメソッドを使うことで、処理が簡潔かつわかりやすく書けることがあります。

また、 TaskGroup には next メソッドが生えていて、 AsyncIteratorProtocol と同じようなイメージで値を1つずつ取り出すことができます。例えば、以下のように書くことで for try await ... in と同じ意味になります。

func getUndoneTodos() async throws -> [Todo] {
    let todoIDs = try await TodoAPI.getUndoneTodoIDs()
        
    return try await withThrowingTaskGroup(of: Todo.self, returning: [Todo].self) { group in
        for id in todoIDs {
            group.addTask {
                try await TodoAPI.getTodoDetail(id: id)
            }
        }

        var todos: [Todo] = []
-       for try await todo in group {
-           todos.append(todo)
-       }
+       while let todo = try await group.next() {
+           todos.append(todo)
+       }
        return todos

    }
}

これだけだと for try await ... in を使えばいいだけなのでメリットが感じられませんが、例えばいくつか子タスクを起動して一番速く返ってきた結果を使いたいということを考えると、以下のように書けます。 next を1回しか読んでいないので for try await ... in と異なりすべての子タスクを待つことなく結果を返せます。

func getFirstNumber() async throws -> Int {
    try await withThrowingTaskGroup(of: Int.self) { group in
        for _ in 0..<10 {
            group.addTask {
                try await getNumber()
            }
        }
        
        let number = try await group.next()!
        group.cancelAll()
        return number
    }
}

また、 next から派生した nextResult も有用です。 nextfor try await ... in は子タスクがエラーを投げたときにそのままエラーを投げ直すので、子タスクがエラーを投げると Swift Concurrency のキャンセルの仕組みに従って他の子タスクがすべてキャンセルされます。 nextResult は子タスクがエラーを投げてもそれをそのまま投げるのではなく Swift の Result 型の failure で受け取るので、子タスクが失敗した際のキャンセルが働かなくなります。
これを利用して、例えば失敗しやすい非同期処理を N 個投げて成功した最初の M 個の値を返すというようなことができます。

func getResultFromFailureProneAPI() async throws -> Int { /* ... */  }

func getSomeResultsFromFailureProneAPI(n: Int, m: Int) async throws -> [Int] {
    await withThrowingTaskGroup(of: Int.self) { group in
        for _ in 0..<n {
            group.addTask {
                try await getResultFromFailureProneAPI()
            }
        }
        
        var results: [Int] = []
        while let result = await group.nextResult() {
            switch result {
            case .success(let value):
                results.append(value)
                if results.count >= m {
                    group.cancelAll()
                    return results
                }
            case .failure:
                break
            }
        }
        return results
    }
}

Task Groups における結果の受け取りは for try await ... in で行うことが多いと思いますが、ここで紹介したいくつかのパターンのように要件に合わせて柔軟に行うことができます。

Task Groups と子タスクの寿命

Swift の Structured Concurrency の根幹を成すのは「タスクツリー上の子タスクは親タスクよりも長生きできない」という原則です。これにより並行処理の状態が把握しやすくなり、また実行中の並行処理を漏れなくキャンセルすることも可能になっています。

Task Groups では、 body 内で await されなかった子タスクは body を抜ける前に暗黙的に await することでこの原則を守っています。子タスクを await しないまま body を抜けてしまうと子タスクが Task Group 自体のタスクよりも長生きしてしまうためです。

具体例で見てみます。以下の例では、 addTask した子タスクを for try await ... in で待たないまま body を抜けようとしています。

func double(number: Int) async throws -> Int {
    try await Task.sleep(nanoseconds: 1_000_000_000)
    let doubled = number * 2
    print("returning \(doubled)")
    return doubled
}

func execTaskGroup() async throws {
    await withThrowingTaskGroup(of: Int.self) { group in
        for n in 1...3 {
            group.addTask {
                try await double(number: n)
            }
        }
		print("finishing body")
    }
    print("finishing execTaskGroup")
}

execTaskGroup を実行すると、以下のような出力が得られます。

finishing body
-- ここで1秒経過 --
returning 6
returning 2
returning 4
finishing execTaskGroup

body の最後の print 文が実行されてから、明示的に await していないにも関わらず double が終わりまで実行され、その後に execTaskGroup が完了していることがわかります。これにより、Task Groups は自身を完了させる前に await されない子タスクを待つことがわかります。

これは当たり前のように思えるかもしれませんが、実は Swift Concurrency において子タスクを生成するもう一つの方法である async let は別の方針で「子タスクは親タスクよりも長生きできない」原則を実現しています。スコープを抜ける前に await されない子タスクを暗黙的にキャンセルする、という方針です。
async let の振る舞いも実際に見てみます。以下のコードでも Task Groups と同じように double を3回呼んで、 await しないままスコープを抜けようとしています。

func execAsyncLets() async throws {
    async let n1 = double(number: 1)
    async let n2 = double(number: 2)
    async let n3 = double(number: 3)

    print("finishing execAsyncLets")
}

execAsyncLets を実行してみると以下の出力が得られます。

finishing execAsyncLets

Task Groups の場合と異なり、 returning [n] の出力は行われません。これは、execAsyncLets のスコープを抜けるときに double の呼び出しが await されずにキャンセルされるためです。キャンセルが実際に発生していることは、例えば以下の print デバッグで確認することができます。

func double(number: Int) async throws -> Int {
+   do {
        try await Task.sleep(nanoseconds: 1_000_000_000)
+   } catch {
+       if Task.isCancelled {
+           print("double cancelled with error: \(error)")
+           throw error
+       }
+   }
    let doubled = number * 2
    print("returning \(doubled)")
    return doubled
}

上記の print を足して再度 execAsyncLets を実行すると出力は以下のように変わります。

finishing execAsyncLets
double cancelled with error: CancellationError()
double cancelled with error: CancellationError()
double cancelled with error: CancellationError()

execAsyncLets のスコープを抜ける際に3つの子タスクがすべてキャンセルされていることがわかります。

以上のように、

  • async let はスコープを抜ける際に await されていない子タスクをキャンセルする
  • Task Groups はスコープを抜ける際に await されていない子タスクを await する

という違いがあることは認識しておいた方がいいと思います。この違いのため、 Task Groups では Collection に対する値を返さない処理をやりやすくなっています。例えば、以下のように Array の要素に対して書き込み処理を行いたいとき、個々の子タスクが値を返さないので addTask する処理を返すのが自然です。何も書かなくても暗黙的にすべての子タスクを await してくれることによりすべての save が完了してからスコープを抜けることが保証されています。

func saveEntities(entities: [Entity]) async throws {
    await withTaskGroup(of: Void.self) { group in
        for entity in entities {
            group.addTask {
                await Database.save(entity)
            }
        }
        // ✅ 何も書かなくてもすべての save が完了するのを待ってくれる
    }
}

このような場合にすべての子タスクが終了してから何かをやりたいという場合は waitForAll を使うこともできます。

func saveEntities(entities: [Entity]) async throws {
    await withTaskGroup(of: Void.self) { group in
        for entity in entities {
            group.addTask {
                await Database.save(entity)
            }
        }
+       await group.waitForAll()
+       print("finished all tasks")
    }
}

for await _ in group {} と書いても同じ動作になりますが、 waitForAll を使った方がより明確かつ簡潔に書くことができます。

Task Groups のキャンセル

記事のここまでの例では、子タスクが失敗したり Task Group を生成した親タスクがキャンセルされたりといった可能性にはあまり触れてきませんでした。ここで Task Group のキャンセル周りの話を見ていきます。

Task Groups がキャンセルされるのは以下の3つの場合です。

  • withThrowingTaskGroupbody がエラーを投げた場合
  • Task Group を生成したタスクがキャンセルされた場合
  • cancelAll が呼ばれた場合

withThrowingTaskGroupbody がエラーを投げた場合

Structured Concurrency において、タスクツリーの中のどこかでエラーが投げられるとツリー内のすべてのタスクがキャンセルされるまでこのエラーが伝播します。 Task Groups でも同様で、 withThrowingTaskGroupbody 内でエラーが投げられた場合は Task Group のまだ完了していないすべての子タスクがキャンセルされます。

例として以下のコードを見てみます。

  • 1秒後に 1 を返す
  • 2秒後に CancellationError を投げる
  • 3秒後に 3 を返す

という3つの子タスクを生成して、結果を for try await ... in で待ち受けています。

func printNumbers() async throws {
    do {
        let result = try await withThrowingTaskGroup(of: Int.self) { group in
            group.addTask {
                try await Task.sleep(nanoseconds: 1_000_000_000)
                print("returning 1")
                return 1
            }
            
            group.addTask {
                try await Task.sleep(nanoseconds: 2_000_000_000)
                print("throwing error")
                throw CancellationError()
            }
            
            group.addTask {
                try await Task.sleep(nanoseconds: 3_000_000_000)
                print("returning 3")
                return 3
            }
            
            var result: [Int] = []
            for try await value in group {
                result.append(value)
            }
            return result
        }
        print("task group completed: \(result)")
    } catch {
        print("task group cancelled")
    }
}

実行してみると以下の出力が得られます。

returning 1
throwing error
task group cancelled

1 が返された後にエラーが投げられて Task Group がキャンセルされています。Task Group のすべての子タスクもキャンセルされるため returning 3 は出力されていないことがわかります。

注意すべき点として、キャンセルが発生するのはあくまで body の中でエラーが投げられた場合のみです。特定の子タスクがキャンセルされただけではそのタスクが結果を返さないだけで、 Task Group 全体に影響を及ぼすことはありません。上記の例で Task Group がキャンセルされているのは子タスクが投げたエラーが body 直下の for try await ... in で再び投げられることが原因であって、子タスクがエラーを投げたこと自体はキャンセルにつながらないということです。これを確認するため、前の項でも紹介した nextResult を使って子タスクが投げたエラーをハンドリングしながら結果を待ち受けてみます。

func printNumbers() async throws {
    do {
        let result = try await withThrowingTaskGroup(of: Int.self) { group in
            group.addTask {
                try await Task.sleep(nanoseconds: 1_000_000_000)
                print("returning 1")
                return 1
            }
            
            group.addTask {
                try await Task.sleep(nanoseconds: 2_000_000_000)
                print("throwing error")
                throw CancellationError()
            }
            
            group.addTask {
                try await Task.sleep(nanoseconds: 3_000_000_000)
                print("returning 3")
                return 3
            }
            
            var result: [Int] = []
-           for try await value in group {
-               result.append(value)
-           }
+           while let nextResult = await group.nextResult() {
+               switch nextResult {
+               case .success(let value):
+                   result.append(value)
+               case .failure(let error):
+                   print("ignoring error: \(error)")
+               }
+           }
            return result
        }
        print("task group completed: \(result)")
    } catch {
        print("task group cancelled")
    }
}

nextResult の結果を switch して、子タスクが投げたエラーを無視するようになっています。実行してみると、以下の出力が得られます。

returning 1
throwing error
ignoring error: CancellationError()
returning 3
task group completed: [1, 3]

エラーを投げた子タスクは無視されて Task Group の実行が続き、最終的に [1, 3] というエラーを無視したすべての子タスクの結果が返されています。このようにエラーに対して柔軟に対応できる場合もあるので、子タスクがエラーを投げたことは直接キャンセルにつながらないことは重要です。

Task Group を生成したタスクがキャンセルされた場合

Structured Concurrency の仕組みに従って、 Task Group を生成したタスクがキャンセルされた場合は Task Group もキャンセルされます。

例として、前項で使った printNumbers からエラーを投げる処理を消したものを使います。

func printNumbers() async throws {
    do {
        let result = try await withThrowingTaskGroup(of: Int.self) { group in
            group.addTask {
                try await Task.sleep(nanoseconds: 1_000_000_000)
                print("returning 1")
                return 1
            }
            
            group.addTask {
                try await Task.sleep(nanoseconds: 2_000_000_000)
                print("returning 2")
                return 2
            }
            
            group.addTask {
                try await Task.sleep(nanoseconds: 3_000_000_000)
                print("returning 3")
                return 3
            }
            
            var result: [Int] = []
            for try await value in group {
                result.append(value)
            }
            return result
        }
        print("task group completed: \(result)")
    } catch {
        print("task group cancelled")
    }
}

この printNumbers を実行開始 2.5 秒後にキャンセルするとどうなるかを見てみます。

func f() async throws {
    let targetTask = Task {
        try await printNumbers()
    }
    
    let cancellingTask = Task {
        try await Task.sleep(nanoseconds: 2_500_000_000)
        targetTask.cancel()
    }
    
    try await (targetTask.value, cancellingTask.value)
}

得られる出力は以下です。

returning 1
returning 2
task group cancelled

Task Group の3つ目の子タスクが 3 を返す前に Task Group を生成したタスクがキャンセルされているため、 Task Group もキャンセルされていることがわかります。

関連して、 withTaskGroup とキャンセルの関係についても整理しておきます。 withTaskGroupwithThrowingTaskGroup と違って body でエラーを投げることができないので一見キャンセルに何も対応できないように思えるかもしれません。もちろんエラーを投げることができないので自分自身がキャンセルの起点になることはできませんが、タスクツリー上の他のタスクがエラーを投げたときにそれに対応して実行を終了したり、部分的な結果を返したりするという形で対応することができます。その例として、 printNumbers で子タスクのシグネチャがエラーを投げないよう、以下のように変更します。

func printNumbersWithoutThrowing() async throws {
    let result = await withTaskGroup(of: Int?.self) { group in
        group.addTask {
            do {
                try await Task.sleep(nanoseconds: 1_000_000_000)
                print("returning 1")
                return 1
            } catch {
                print("receiving error, returning nil")
                return nil
            }
        }
        
        group.addTask {
            do {
                try await Task.sleep(nanoseconds: 2_000_000_000)
                print("returning 2")
                return 2
            } catch {
                print("receiving error, returning nil")
                return nil
            }
        }
        
        group.addTask {
            do {
                try await Task.sleep(nanoseconds: 3_000_000_000)
                print("returning 3")
                return 3
            } catch {
                print("receiving error, returning nil")
                return nil
            }
        }
        
        var result: [Int?] = []
        for await value in group {
            result.append(value)
        }
        return result
    }
    print("task group completed: \(result)")
}

addTask で追加する子タスクの中でエラーを catch して nil を返すことで、子タスク自体はエラーを投げなくなります。これにより withThrowingTaskGroup ではなく withTaskGroup を使うことができるようになり、関連する try も外せています。

この printNumbersWithoutThrowing を 2.5 秒後にキャンセルするとどうなるか見てみます。

func f() async throws {
    let targetTask = Task {
        try await printNumbersWithoutThrowing()
    }
    
    let cancellingTask = Task {
        try await Task.sleep(nanoseconds: 2_500_000_000)
        targetTask.cancel()
    }
    
    try await (targetTask.value, cancellingTask.value)
}

実行結果は以下のようになります。

returning 1
returning 2
receiving error, returning nil
task group completed: [Optional(1), Optional(2), nil]

キャンセルを受けた時点で完了していない Task Group の子タスクは3つ目のみですが、これはキャンセルを無視することなく即座に nil を返しています。結果として、 printNumbersWithoutThrowing はすべての子タスクをまとめた完全な結果を得ることはできてはいませんが、キャンセルを受けた時点でできる限りの結果を print しています。

以上のように、 withTaskGroup でエラーを投げない子タスクを生成した場合もそれ自身がキャンセルを発生させないだけで親タスクから伝播してきたキャンセルを適切にハンドリングすることが可能です。とはいえ、キャンセルを無視して実行を続ける子タスクも考えることはできるので、キャンセル時の振る舞いは子タスクの実装次第ではあります。

cancelAll が呼ばれた場合

子タスクでエラーが発生したわけではないが Task Group からキャンセルを実行したい場合に body 内で TaskGroupcancelAll メソッドを呼びます。

cancelAll が有用な例として、タイムアウトを設定しつつ非同期処理を実行する場合があります。

func getNumber() async throws -> Int { /* ... */ }

func getNumberWithTimeout(timeoutNanoseconds: UInt64) async throws -> Int {
    try await withThrowingTaskGroup(of: Int.self) { group in
        group.addTask {
            try await getNumber()
        }
        
        group.addTask {
            try await Task.sleep(nanoseconds: timeoutNanoseconds)
            throw CancellationError()
        }
        
        let result = try await group.next()!
        group.cancelAll()
        return result
    }
}

Task Group は子タスクを2つ生成します。1つは本来実行したい処理である getNumber で、もう1つはタイムアウト用のタスクです。タイムアウト用のタスクは指定された時間だけ待ってから CancellationError を投げることで、 getNumber に想定外に時間がかかったときに getNumberWithTimeout の呼び出し元を待たせることなく処理をキャンセルすることができます。

もし getNumber が無事タイムアウト前に終わった場合は try await group.next()! で結果を得られます。注意点として、その直後に group.cancelAll() で未完了の子タスクをキャンセルしています。この場合の未完了の子タスクというのはタイムアウト用のタスクのことです。

cancelAll の呼び出しがない場合にどうなるか考えてみます。すでに説明したように、 Task Groups は body を抜ける時点で await されていない子タスクを暗黙的に await します。 cancelAll でキャンセルしない限りタイムアウト用の子タスクは await されないまま残るので、例えば getNumber が1秒で完了する場合でもタイムアウトに 10 秒が設定されていた場合に getNumberWithTimeout の完了には 10 秒かかってしまうことになります。これを避けるために cancelAll が使われています。

参考

  1. もちろんこれは典型的に Task Groups が有効な場面であって、固定の数の処理や、異なる型を返す処理に対しても Task Groups を使いたくなる場面はあります。

39
21
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
39
21

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?