3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

`AsyncStream`を複数箇所で購読する

3
Last updated at Posted at 2025-12-28

はじめに

AsyncStreamでマルチブロードキャストを実現するための方法です。
前提として、AsyncStream自体は複数箇所での購読に対応していないです。

forumsにも複数TaskからAsyncStreamを使用する方法の質問が投げられたりもしていました。

そんな中、SE-0468がSwift 6.2で実装されたため、複数箇所での購読を実現する方法を調べてまとめました。

そもそもNotificationCenterを使えばいいんじゃねとか思ったり。iOS 26から型安全に使えるようになったし

期待通りに動かない例

例えば以下のようなコードでは taskAtaskB で値が分配されてしまいます。

actor Sender {
    let values: AsyncStream<Int>
    
    init() {
        values = AsyncStream { continuation in
            Task {
                for i in 1...9 {
                    try? await Task.sleep(for: .seconds(1))
                    continuation.yield(i)
                }
                continuation.finish()
            }

            continuation.onTermination = { @Sendable termination in
                switch termination {
                    case .cancelled:
                        print("cancelled")
                    case .finished:
                        print("finished")
                }
            }
        }
    }
}

let sender = Sender()

let taskA = Task {
    for await value in await sender.values {
        print("A: \(value)")
    }
}

let taskB = Task {
    for await value in await sender.values {
        print("B: \(value)")
    }
}

/*
A: 1
B: 2
A: 3
B: 4
A: 5
B: 6
A: 7
B: 8
A: 9
finished
*/

さらに、途中で taskA をキャンセルすると taskB も一緒にキャンセルされてしまいます。

...

Task {
    try? await Task.sleep(for: .seconds(5))
    taskA.cancel()
}

/*
A: 1
B: 2
A: 3
B: 4
cancelled
*/

期待通りに動かない例 2

それなら次のコードだとどうでしょう?

actor Sender {
    var values: AsyncStream<Int> {
        AsyncStream { continuation in
            Task {
                for i in 1...9 {
                    try? await Task.sleep(for: .seconds(1))
                    continuation.yield(i)
                }
                continuation.finish()
            }
            
            continuation.onTermination = { @Sendable termination in
                switch termination {
                    case .cancelled:
                        print("cancelled")
                    case .finished:
                        print("finished")
                }
            }
        }
    }
}

let sender = Sender()

let taskA = Task {
    for await value in await sender.values {
        print("A: \(value)")
    }
}

let taskB = Task {
    for await value in await sender.values {
        print("B: \(value)")
    }
}

Task {
    try? await Task.sleep(for: .seconds(5))
    taskA.cancel()
}

/*
B: 1
A: 1
A: 2
B: 2
B: 3
A: 3
A: 4
B: 4
cancelled
B: 5
B: 6
B: 7
B: 8
B: 9
finished
*/

途中で taskA をキャンセルしてもその後 taskB には値が流れており一見よさそうに見えます。
しかしこれで本当に良いのでしょうか?
例えば以下のようにして確認してみましょう

Task {
    let taskA = Task {
        for await value in await sender.values {
            print("A: \(value)")
        }
    }
    
    try? await Task.sleep(for: .seconds(3))

    // 3秒遅延してから`taskB`でも購読開始
    let taskB = Task {
        for await value in await sender.values {
            print("B: \(value)")
        }
    }
}

/*
A: 1
A: 2
A: 3
B: 1  // <- 途中から購読開始したが「1」を出力してしまっている
A: 4
B: 2
A: 5
B: 3
A: 6
B: 4
A: 7
B: 5
A: 8
B: 6
A: 9
finished
B: 7
B: 8
B: 9
finished
*/

sender自体は同じなのですが、valuesは computed property であるためアクセスするたびに新しいAsyncStreamを生成します。
そのたびに AsyncStream.Continuation と 値を送信するTaskも新しく作られるため、taskAtaskBは同じイベントを共有しておらず、taskBは独立した新しいイベントを購読していることになります。

期待値として同じイベントを共有しているのであればtaskBも値が「4」から流れてきてほしいです。

これは、valuesを computed property ではなくメソッドにしても同様です。

values をメソッドにした例
actor Sender {
    func values() -> AsyncStream<Int> {
        AsyncStream { continuation in
            Task {
                for i in 1...9 {
                    try? await Task.sleep(for: .seconds(1))
                    continuation.yield(i)
                }
            }
            
            continuation.onTermination = { @Sendable termination in
                switch termination {
                    case .cancelled:
                        print("cancelled")
                    case .finished:
                        print("finished")
                }
            }
        }
    }
}

let sender = Sender()

Task {
    let taskA = Task {
        for await value in await sender.values {
            print("A: \(value)")
        }
    }
    
    try? await Task.sleep(for: .seconds(3))

    // 3秒遅延してから`taskB`でも購読開始
    let taskB = Task {
        for await value in await sender.values {
            print("B: \(value)")
        }
    }
}

/*
A: 1
A: 2
A: 3
B: 1
A: 4
B: 2
A: 5
B: 3
A: 6
B: 4
A: 7
B: 5
A: 8
B: 6
A: 9
finished
B: 7
B: 8
B: 9
finished
*/

AsyncStreamを複数箇所で購読する方法

購読を開始する際に生成される AsyncStream.Continuation と 値を送信するTask をそれぞれ管理することで実現することができます。

以下のコードは一例です。

actor Sender {
    private var continuations: [UUID: AsyncStream<Int>.Continuation] = [:]
    private var task: Task<Void, Never>?
    
    init() {
        task = Task {
            for i in 1...9 {
                try? await Task.sleep(for: .seconds(1))
                
                for (_, c) in await continuations {
                    c.yield(i)
                }
            }
            
            for (_, c) in await continuations {
                c.finish()
            }
        }
    }
    
    var values: AsyncStream<Int> {
        AsyncStream { continuation in
            let id = UUID()
            continuations[id] = continuation
            
            continuation.onTermination = { @Sendable termination in
                Task {
                    await self.removeContinuation(for: id)
                    
                    switch termination {
                        case .cancelled:
                            print("cancelled")
                        case .finished:
                            print("finished")
                    }
                }
            }
        }
    }
    
    private func removeContinuation(for id: UUID) {
        continuations.removeValue(forKey: id)
    }
    
    deinit {
        task?.cancel()
        continuations.removeAll()
    }
}

このようにすることで、値が分配されることや、片方のキャンセルにもう片方が影響することを防ぐことができます。

実行結果
let taskA = Task {
    for await value in await sender.values {
        print("A: \(value)")
    }
}

let taskB = Task {
    for await value in await sender.values {
        print("B: \(value)")
    }
}

Task {
    try? await Task.sleep(for: .seconds(5))
    taskA.cancel()
}

/*
B: 1
A: 1
B: 2
A: 2
B: 3
A: 3
B: 4
A: 4
B: 5
cancelled
B: 6
B: 7
B: 8
B: 9
finished
*/

途中から購読開始しても、同じイベントを受信できるようになります。

実行結果
Task {
    let taskA = Task {
        for await value in await sender.values {
            print("A: \(value)")
        }
    }
    
    try? await Task.sleep(for: .seconds(3))
    
    let taskB = Task {
        for await value in await sender.values {
            print("B: \(value)")
        }
    }
}

/*
A: 1
A: 2
A: 3
B: 3  // <- 途中から購読開始しても `taskA` と同じ値を出力してる
A: 4
B: 4
A: 5
B: 5
A: 6
B: 6
A: 7
B: 7
A: 8
B: 8
A: 9
B: 9
finished
finished
*/

上記の例では Sender 内で 値を送信するTask を管理してますが、例えばNWPathMonitorやApple公式のサンプルで書かれているQuakeMonitorのようなハンドラで値が渡ってくるケースも考え方は同様です。

AsyncStreamのクロージャ内では生成されたAsyncStream.Continuationを保持するだけにして、実際に値が渡されるハンドラ内で保持している各AsyncStream.Continuationyield を呼んで値を渡すようにすれば良いです。

class Monitor {
    var handler: ((Int) -> Void)?
    
    private var count: Int = 0
    
    let timer = AsyncStream {
        try? await Task.sleep(for: .seconds(1))
    }
    
    func start() {
        Task {
            for await _ in timer {
                handler?(count)
                count += 1
            }
        }
    }
    
    func stop() {
        handler = nil
    }
}

actor Sender {
    private let monitor = Monitor()
    private var continuations: [UUID: AsyncStream<Int>.Continuation] = [:]
    
    init() {
        monitor.handler = { value in
            Task {
                for (_, c) in await self.continuations {
                    c.yield(value)
                }
            }
        }
        
        monitor.start()
    }
    
    var values: AsyncStream<Int> {
        AsyncStream { continuation in
            let id = UUID()
            continuations[id] = continuation
            
            continuation.onTermination = { @Sendable termination in
                Task {
                    await self.removeContinuation(for: id)
                }
            }
        }
    }
    
    private func removeContinuation(for id: UUID) {
        continuations.removeValue(forKey: id)
        
        if continuations.isEmpty {
            monitor.stop()
        }
    }
}

iOS 26から

iOS 26からはAsyncStream.ContinuationHashableに準拠するため、UUIDでの管理が不要となりSetにすることができます。

公式ドキュメント上はiOS 13.0+となっているのですが、 実際にHashableとして利用できるのは iOS 26.0+ となっています。

actor Sender {
    private var continuations: Set<AsyncStream<Int>.Continuation> = []
    private var task: Task<Void, Never>?
    
    var values: AsyncStream<Int> {
        AsyncStream { continuation in
            continuations.insert(continuation)
            
            continuation.onTermination = { @Sendable termination in
                Task {
                    await self.remove(continuation)
                    
                    switch termination {
                        case .cancelled:
                            print("cancelled")
                        case .finished:
                            print("finished")
                    }
                }
            }
        }
    }
    
    private func remove(_ continuation: AsyncStream<Int>.Continuation) {
        continuations.remove(continuation)
    }
}

結局のところ?

  • 値を送信する側の処理は一度だけ用意
  • 購読を開始するたびにAsyncStreamAsyncStream.Continuationを生成
  • 保持している全てのAsyncStream.Continuationに対して値を送信する

という流れで複数箇所での購読を実現しています。

例として挙げたコード以外にもさまざまな実装方法があると思いますが、複数箇所での購読を実現する場合基本的にこの3点は変わらないかな、と思います。

また過去にはforumsで、Combineを使ってSubjectAsyncStreamでラップする方法も紹介されてたりします。

@preconcurrency importを許容できるのであればCombineを使用するのも1つの手だと思います。
(そもそもCombineだと対応してるし...)

最後に

AsyncStreamを複数箇所で購読する方法をまとめました。

正直なところ、冒頭や前述した通りNotificationCenterやCombineを利用した方が素直に実装できるケースも多いと思います。

  • NotificationCenterだと型安全でないのが気になる
  • Combine@preconcurrency importを許容したくない
  • どうしてもAsyncStreamで実装したい

って人の助けに少しでもなれば幸いです。

追記

swift-async-algorithms 1.1.1からshare()のサポートが入ったことを教えてもらいました!
@shimastriperさん、ありがとうございます!)

swift-async-algorithms は詳しくないのですが、以下のようなコードで動くこと確認できました。
(全然使い方違うとかあったら指摘ください)

import AsyncAlgorithms

actor Sender {
    let values: AsyncStream<Int>

    init() {
        values = AsyncStream<Int> { continuation in
            Task {
                for i in 1...9 {
                    try? await Task.sleep(for: .seconds(1))
                    continuation.yield(i)
                }
                continuation.finish()
            }
            
            continuation.onTermination = { termination in
                switch termination {
                case .cancelled:
                    print("cancelled")
                case .finished:
                    print("finished")
                default:
                    break
                }
            }
        }
    }
}

let share = Sender().values.share()

let taskA = Task {
    for await v in share {
        print("A:", v)
    }
}

try? await Task.sleep(for: .seconds(3))

let taskB = Task {
    for await v in share {
        print("B:", v)
    }
}

try? await Task.sleep(for: .seconds(2))

taskA.cancel()
_ = await taskB.result

/*
A: 1
A: 2
A: 3
B: 4
A: 4
B: 5  // <- 5秒時点で `taskA.cancel()` してるため、以降 `taskA` が出力されなくなってる
B: 6
B: 7
B: 8
finished
B: 9
*/

試してみて、自分の書き方が悪いのかどうなのか、taskA.cancel()した時にcontinuation.onTerminationのcancelledが動いてなさそうでした。

参考

Building an Event Broadcaster in Swift with AsyncStream and Actors.
https://medium.com/@shial184686/building-an-event-broadcaster-in-swift-with-asyncstream-and-actors-feeda3f6b6e4

Consuming an AsyncStream from multiple Tasks
https://forums.swift.org/t/consuming-an-asyncstream-from-multiple-tasks/54453

AsyncSequence / Stream version of PassthroughSubject or CurrentValueSubject?
https://forums.swift.org/t/asyncsequence-stream-version-of-passthroughsubject-or-currentvaluesubject/60395

Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Share.md
https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Share.md

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?