はじめに
SwiftのAsyncStream
は、呼び出し元のTaskがキャンセルされると、そのキャンセル状態を検知し、AsyncStreamも適切に終了する仕組みを持っています。
本記事では、AsyncStreamがキャンセルされる仕組みや、特定の状況で必要となる注意点について調べたことをまとめます。
呼び出し元のTaskによるキャンセル
func asyncStreamFunction() -> AsyncStream<Int> {
AsyncStream { continuation in
for i in 0..<10 {
continuation.yield(i)
}
continuation.onTermination = { termination in
switch termination {
case .cancelled:
print("cancelled")
case .finished:
print("finished")
}
}
}
}
let task = Task {
for await value in asyncStreamFunction() {
do {
print("stream: ", value)
try await Task.sleep(for: .seconds(1))
} catch {
return
}
}
}
Task {
try? await Task.sleep(for: .seconds(5))
task.cancel()
}
実行結果
stream: 0
stream: 1
stream: 2
stream: 3
stream: 4
cancelled
AsyncStreamで受け取るクロージャは同期的に処理されます。つまり呼び出し元のTaskのキャンセルが子Taskに伝播しているわけではありません。
ではなぜ呼び出し元のTaskをキャンセルすると、AsyncStreamまでキャンセルされるのか?
気になったので少し調べてみました。
なぜAsyncStreamはキャンセルされるのか?
仕組みの結論: withTaskCancellationHandler
AsyncStreamは内部でタスクのキャンセル状態を監視しており、これにはwithTaskCancellationHandler
が利用されています。
まずAsyncStreamを使って非同期的に値を受け取るにはfor-await
を使います
for await value in stream {
上記のコードは、以下のように書き直すことができます。
var iteretor = asyncStreamFunction().makeAsyncIterator()
while let value = await iteretor.next() {
このnext
の中は以下のような実装になっています。
func next() async -> Element? {
await withTaskCancellationHandler {
await withUnsafeContinuation {
next($0)
}
} onCancel: { [cancel] in
cancel()
}
}
withTaskCancellationHandler
は現在実行中のTaskがキャンセルされた時の挙動を指定できるメソッドです。
つまり値を受け取るたびに、Taskがキャンセルされているかどうかを判別し、キャンセルされているならAsyncStreamもキャンセルされるようになっているわけです。
注意: AsyncStream内でTaskを利用する場合
AsyncStream内で独自にTaskを生成して非同期処理を行う場合は、呼び出し元のTaskがキャンセルされても内部のTaskは自動的にキャンセルされません。この場合、手動でTaskをキャンセルする必要があります。
func asyncStreamFunction() -> AsyncStream<Int> {
AsyncStream { continuation in
let task = Task {
for i in 0..<10 {
try? await Task.sleep(for: .seconds(1))
continuation.yield(i)
print("task: ", i)
}
}
continuation.onTermination = { termination in
switch termination {
case .cancelled:
print("cancelled")
case .finished:
print("finished")
}
}
}
}
let task = Task {
for await value in asyncStreamFunction() {
print("stream: ", value)
}
}
Task {
try? await Task.sleep(for: .seconds(5))
task.cancel()
}
実行結果
stream: 0
task: 0
stream: 1
task: 1
stream: 2
task: 2
stream: 3
task: 3
cancelled
stream: 4
task: 4
task: 5
task: 6
task: 7
task: 8
task: 9
この場合、呼び出し元のタスクをキャンセルしても、AsyncStream内で生成したTaskはキャンセルされず、処理を続けています。
AsyncStream内で生成したTaskをキャンセルするには、onTerminationハンドラ内でそのTaskを手動でキャンセルする必要があります。
func asyncStreamFunction() -> AsyncStream<Int> {
AsyncStream { continuation in
let task = Task {
for i in 0..<10 {
do {
try await Task.sleep(for: .seconds(1))
continuation.yield(i)
print("task: ", i)
} catch {
return
}
}
}
continuation.onTermination = { termination in
switch termination {
case .cancelled:
print("cancelled")
case .finished:
print("finished")
}
task.cancel()
}
}
}
実行結果
stream: 0
task: 0
stream: 1
task: 1
stream: 2
task: 2
stream: 3
task: 3
task: 4
stream: 4
cancelled
onTermination内でtask.cancel()
を呼び出すことで、内部のTaskもキャンセルされるようになりました。