LoginSignup
6
2

More than 1 year has passed since last update.

AsyncStream.ContinuationのonTerminationが呼ばれるタイミング

Posted at

onTerminationが呼ばれるタイミング

SE-0314に以下の記載があります。

Cancellation Handlers
When defined, a continuation’s onTermination handler function is called when iteration ends, when the stream goes out of scope, or when the task containing the stream is canceled.

onTerminationは以下3つのタイミングで呼ばれるとあります。

  1. イテレーションが終了したとき
  2. ストリームがスコープから外れたとき
  3. ストリームを内包するTaskがキャンセルされたとき

3つのケースについて、コード例を示しながら説明します。

イテレーションが終了したとき

continuation.finish()メソッドを呼び出すと、イテレーションを終了させることができます。

以下の例では、ストリームが生成されると1秒毎に0からカウントアップし、4をストリームに流したらfinish()を呼び出すようにしています。

let stream = AsyncStream<Int> { continuation in
    let task = Task.detached {
        for i in 0..<5 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            continuation.yield(i)
        }
        continuation.finish()
    }
    continuation.onTermination = { termination in
        switch termination {
        case .cancelled:
            print("cancelled")
        case .finished:
            print("finished")
        }
        task.cancel()
    }
}

Task.detached {
    for await i in stream {
        print(i)
    }
}

出力結果は以下の通りです。
イテレーションが終了し、onTerminationが呼ばれてfinishedが出力されています。

※手元で試したところ、4が出力されるより先にfinishedが出力されました。ここは並列処理だから逆になることもある...?

0
1
2
3
finished
4

ストリームがスコープから外れたとき

以下の例では、ストリーム生成後、1秒待機したらTaskの実行が終了します。

finish()が呼ばれる前にTaskが終了してストリームがスコープを抜けてしまうので、ストリームがキャンセルされてしまいます。
ストリームがキャンセルされるとonTerminationが呼ばれます。

Task.detached {
    let stream = AsyncStream<Int> { continuation in
        let task = Task.detached {
            for i in 0..<5 {
                try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
                continuation.yield(i)
            }
            continuation.finish()
        }
        continuation.onTermination = { termination in
            switch termination {
            case .cancelled:
                print("cancelled")
            case .finished:
                print("finished")
            }
            task.cancel()
        }
    }

    try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
}

出力結果は以下の通りで、上記コードの実行開始から1秒後にcancelledが出力されます。

cancelled

ストリームを内包するTaskがキャンセルされたとき

ここでは要素が永久的に流れ続けるストリームを想定します。
ストリームに終了の概念がないので、finish()の呼び出しはありません。
NotificationCenterの通知をイメージしてもらうといいかもしれません。

TimerはAsyncSequenceに準拠しており、1秒毎に0からカウントアップした数字を出力するシーケンスです。
これをAsyncStreamでラップし、ストリームに数字を流し続けます。

for-await-in文のところで、数字が5まで出力されたら、Taskをキャンセルしています。
ストリームを内包するTaskをキャンセルすると、ストリームもキャンセルされ、onTerminationが呼ばれます。

struct Timer: AsyncSequence {
    typealias Element = Int

    struct AsyncIterator: AsyncIteratorProtocol {
        var count = 0

        mutating func next() async -> Element? {
            do {
                try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            } catch {
                return nil
            }
            var value = count
            count += 1
            return value
        }
    }

    func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator()
    }
}

let stream = AsyncStream<Int> { continuation in
    let task = Task.detached {
        for await i in Timer() {
            continuation.yield(i)
        }
    }
    continuation.onTermination = { termination in
        switch termination {
        case .cancelled:
            print("cancelled")
        case .finished:
            print("finished")
        }
        task.cancel()
    }
}

let task = Task.detached {
    for await i in stream {
        print(i)
        if i == 5 {
            task.cancel()
        }
    }
}

出力結果は以下の通りで、0〜5までの数字が出力されたあと、cancelledが出力されます。

0
1
2
3
4
5
cancelled

あとがき

要素が流れ続けるストリームを実装しようとしてて、「ストリームに明示的な終了の概念がないときはどうやってストリームをキャンセルしたらいいんだ?」と疑問に思ったのがきっかけで調べてみました。

このようなストリームを実装するときはfinish()を呼ぶ必要はなく、ストリームを内包するTaskをキャンセルしてあげればいいんですね。

6
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
2