0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Ryuto_YuzのひとりAdvent Calendar 2024

Day 11

AsyncStreamのキャンセルの仕組みと注意点

Posted at

はじめに

SwiftのAsyncStreamは、呼び出し元のTaskがキャンセルされると、そのキャンセル状態を検知し、AsyncStreamも適切に終了する仕組みを持っています。

本記事では、AsyncStreamがキャンセルされる仕組みや、特定の状況で必要となる注意点について調べたことをまとめます。

呼び出し元のTaskによるキャンセル

func asyncStreamFunction() -> AsyncStream<Int> {
    AsyncStream { continuation in
        for i in 0..<10 {
            continuation.yield(i)
        }
        
        continuation.onTermination = { termination in
            switch termination {
            case .cancelled:
                print("cancelled")
            case .finished:
                print("finished")
            }
        }
    }
}

let task = Task {
    for await value in asyncStreamFunction() {
        do {
            print("stream: ", value)
            try await Task.sleep(for: .seconds(1))
        } catch {
            return
        }
    }
}

Task {
    try? await Task.sleep(for: .seconds(5))
    task.cancel()
}

実行結果

stream:  0
stream:  1
stream:  2
stream:  3
stream:  4
cancelled

AsyncStreamで受け取るクロージャは同期的に処理されます。つまり呼び出し元のTaskのキャンセルが子Taskに伝播しているわけではありません。

ではなぜ呼び出し元のTaskをキャンセルすると、AsyncStreamまでキャンセルされるのか?
気になったので少し調べてみました。

なぜAsyncStreamはキャンセルされるのか?

仕組みの結論: withTaskCancellationHandler

AsyncStreamは内部でタスクのキャンセル状態を監視しており、これにはwithTaskCancellationHandlerが利用されています。

まずAsyncStreamを使って非同期的に値を受け取るにはfor-awaitを使います

for await value in stream {

上記のコードは、以下のように書き直すことができます。

var iteretor = asyncStreamFunction().makeAsyncIterator()
while let value = await iteretor.next() {

このnextの中は以下のような実装になっています。

func next() async -> Element? {
    await withTaskCancellationHandler {
        await withUnsafeContinuation {
          next($0)
        }
    } onCancel: { [cancel] in
        cancel()
    }
}

withTaskCancellationHandlerは現在実行中のTaskがキャンセルされた時の挙動を指定できるメソッドです。

つまり値を受け取るたびに、Taskがキャンセルされているかどうかを判別し、キャンセルされているならAsyncStreamもキャンセルされるようになっているわけです。

注意: AsyncStream内でTaskを利用する場合

AsyncStream内で独自にTaskを生成して非同期処理を行う場合は、呼び出し元のTaskがキャンセルされても内部のTaskは自動的にキャンセルされません。この場合、手動でTaskをキャンセルする必要があります。

func asyncStreamFunction() -> AsyncStream<Int> {
    AsyncStream { continuation in
        let task = Task {
            for i in 0..<10 {
                try? await Task.sleep(for: .seconds(1))
                continuation.yield(i)
                print("task: ", i)
            }
        }
        
        continuation.onTermination = { termination in
            switch termination {
            case .cancelled:
                print("cancelled")
            case .finished:
                print("finished")
            }
        }
    }
}

let task = Task {
    for await value in asyncStreamFunction() {
        print("stream: ", value)
    }
}

Task {
    try? await Task.sleep(for: .seconds(5))
    task.cancel()
}

実行結果

stream:  0
task:  0
stream:  1
task:  1
stream:  2
task:  2
stream:  3
task:  3
cancelled
stream:  4
task:  4
task:  5
task:  6
task:  7
task:  8
task:  9

この場合、呼び出し元のタスクをキャンセルしても、AsyncStream内で生成したTaskはキャンセルされず、処理を続けています。

AsyncStream内で生成したTaskをキャンセルするには、onTerminationハンドラ内でそのTaskを手動でキャンセルする必要があります。

func asyncStreamFunction() -> AsyncStream<Int> {
    AsyncStream { continuation in
        let task = Task {
            for i in 0..<10 {
                do {
                    try await Task.sleep(for: .seconds(1))
                    continuation.yield(i)
                    print("task: ", i)
                } catch {
                    return
                }
            }
        }
        
        continuation.onTermination = { termination in
            switch termination {
            case .cancelled:
                print("cancelled")
            case .finished:
                print("finished")
            }
            
            task.cancel()
        }
    }
}

実行結果

stream:  0
task:  0
stream:  1
task:  1
stream:  2
task:  2
stream:  3
task:  3
task:  4
stream:  4
cancelled

onTermination内でtask.cancel()を呼び出すことで、内部のTaskもキャンセルされるようになりました。

参考にした記事

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?