onTerminationが呼ばれるタイミング
SE-0314に以下の記載があります。
Cancellation Handlers
When defined, a continuation’sonTermination
handler function is called when iteration ends, when the stream goes out of scope, or when the task containing the stream is canceled.
onTermination
は以下3つのタイミングで呼ばれるとあります。
- イテレーションが終了したとき
- ストリームがスコープから外れたとき
- ストリームを内包するTaskがキャンセルされたとき
3つのケースについて、コード例を示しながら説明します。
イテレーションが終了したとき
continuation.finish()
メソッドを呼び出すと、イテレーションを終了させることができます。
以下の例では、ストリームが生成されると1秒毎に0からカウントアップし、4をストリームに流したらfinish()
を呼び出すようにしています。
let stream = AsyncStream<Int> { continuation in
let task = Task.detached {
for i in 0..<5 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(i)
}
continuation.finish()
}
continuation.onTermination = { termination in
switch termination {
case .cancelled:
print("cancelled")
case .finished:
print("finished")
}
task.cancel()
}
}
Task.detached {
for await i in stream {
print(i)
}
}
出力結果は以下の通りです。
イテレーションが終了し、onTermination
が呼ばれてfinished
が出力されています。
※手元で試したところ、4が出力されるより先にfinished
が出力されました。ここは並列処理だから逆になることもある...?
0
1
2
3
finished
4
ストリームがスコープから外れたとき
以下の例では、ストリーム生成後、1秒待機したらTaskの実行が終了します。
finish()
が呼ばれる前にTaskが終了してストリームがスコープを抜けてしまうので、ストリームがキャンセルされてしまいます。
ストリームがキャンセルされるとonTermination
が呼ばれます。
Task.detached {
let stream = AsyncStream<Int> { continuation in
let task = Task.detached {
for i in 0..<5 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(i)
}
continuation.finish()
}
continuation.onTermination = { termination in
switch termination {
case .cancelled:
print("cancelled")
case .finished:
print("finished")
}
task.cancel()
}
}
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
}
出力結果は以下の通りで、上記コードの実行開始から1秒後にcancelled
が出力されます。
cancelled
ストリームを内包するTaskがキャンセルされたとき
ここでは要素が永久的に流れ続けるストリームを想定します。
ストリームに終了の概念がないので、finish()
の呼び出しはありません。
NotificationCenterの通知をイメージしてもらうといいかもしれません。
TimerはAsyncSequenceに準拠しており、1秒毎に0からカウントアップした数字を出力するシーケンスです。
これをAsyncStreamでラップし、ストリームに数字を流し続けます。
for-await-in文のところで、数字が5まで出力されたら、Taskをキャンセルしています。
ストリームを内包するTaskをキャンセルすると、ストリームもキャンセルされ、onTermination
が呼ばれます。
struct Timer: AsyncSequence {
typealias Element = Int
struct AsyncIterator: AsyncIteratorProtocol {
var count = 0
mutating func next() async -> Element? {
do {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
} catch {
return nil
}
var value = count
count += 1
return value
}
}
func makeAsyncIterator() -> AsyncIterator {
return AsyncIterator()
}
}
let stream = AsyncStream<Int> { continuation in
let task = Task.detached {
for await i in Timer() {
continuation.yield(i)
}
}
continuation.onTermination = { termination in
switch termination {
case .cancelled:
print("cancelled")
case .finished:
print("finished")
}
task.cancel()
}
}
let task = Task.detached {
for await i in stream {
print(i)
if i == 5 {
task.cancel()
}
}
}
出力結果は以下の通りで、0〜5までの数字が出力されたあと、cancelled
が出力されます。
0
1
2
3
4
5
cancelled
あとがき
要素が流れ続けるストリームを実装しようとしてて、「ストリームに明示的な終了の概念がないときはどうやってストリームをキャンセルしたらいいんだ?」と疑問に思ったのがきっかけで調べてみました。
このようなストリームを実装するときはfinish()
を呼ぶ必要はなく、ストリームを内包するTaskをキャンセルしてあげればいいんですね。