はじめに
AsyncStreamでマルチブロードキャストを実現するための方法です。
前提として、AsyncStream自体は複数箇所での購読に対応していないです。
forumsにも複数TaskからAsyncStreamを使用する方法の質問が投げられたりもしていました。
そんな中、SE-0468がSwift 6.2で実装されたため、複数箇所での購読を実現する方法を調べてまとめました。
そもそもNotificationCenterを使えばいいんじゃねとか思ったり。iOS 26から型安全に使えるようになったし
期待通りに動かない例
例えば以下のようなコードでは taskA と taskB で値が分配されてしまいます。
actor Sender {
let values: AsyncStream<Int>
init() {
values = AsyncStream { continuation in
Task {
for i in 1...9 {
try? await Task.sleep(for: .seconds(1))
continuation.yield(i)
}
continuation.finish()
}
continuation.onTermination = { @Sendable termination in
switch termination {
case .cancelled:
print("cancelled")
case .finished:
print("finished")
}
}
}
}
}
let sender = Sender()
let taskA = Task {
for await value in await sender.values {
print("A: \(value)")
}
}
let taskB = Task {
for await value in await sender.values {
print("B: \(value)")
}
}
/*
A: 1
B: 2
A: 3
B: 4
A: 5
B: 6
A: 7
B: 8
A: 9
finished
*/
さらに、途中で taskA をキャンセルすると taskB も一緒にキャンセルされてしまいます。
...
Task {
try? await Task.sleep(for: .seconds(5))
taskA.cancel()
}
/*
A: 1
B: 2
A: 3
B: 4
cancelled
*/
期待通りに動かない例 2
それなら次のコードだとどうでしょう?
actor Sender {
var values: AsyncStream<Int> {
AsyncStream { continuation in
Task {
for i in 1...9 {
try? await Task.sleep(for: .seconds(1))
continuation.yield(i)
}
continuation.finish()
}
continuation.onTermination = { @Sendable termination in
switch termination {
case .cancelled:
print("cancelled")
case .finished:
print("finished")
}
}
}
}
}
let sender = Sender()
let taskA = Task {
for await value in await sender.values {
print("A: \(value)")
}
}
let taskB = Task {
for await value in await sender.values {
print("B: \(value)")
}
}
Task {
try? await Task.sleep(for: .seconds(5))
taskA.cancel()
}
/*
B: 1
A: 1
A: 2
B: 2
B: 3
A: 3
A: 4
B: 4
cancelled
B: 5
B: 6
B: 7
B: 8
B: 9
finished
*/
途中で taskA をキャンセルしてもその後 taskB には値が流れており一見よさそうに見えます。
しかしこれで本当に良いのでしょうか?
例えば以下のようにして確認してみましょう
Task {
let taskA = Task {
for await value in await sender.values {
print("A: \(value)")
}
}
try? await Task.sleep(for: .seconds(3))
// 3秒遅延してから`taskB`でも購読開始
let taskB = Task {
for await value in await sender.values {
print("B: \(value)")
}
}
}
/*
A: 1
A: 2
A: 3
B: 1 // <- 途中から購読開始したが「1」を出力してしまっている
A: 4
B: 2
A: 5
B: 3
A: 6
B: 4
A: 7
B: 5
A: 8
B: 6
A: 9
finished
B: 7
B: 8
B: 9
finished
*/
sender自体は同じなのですが、valuesは computed property であるためアクセスするたびに新しいAsyncStreamを生成します。
そのたびに AsyncStream.Continuation と 値を送信するTaskも新しく作られるため、taskAとtaskBは同じイベントを共有しておらず、taskBは独立した新しいイベントを購読していることになります。
期待値として同じイベントを共有しているのであればtaskBも値が「4」から流れてきてほしいです。
これは、valuesを computed property ではなくメソッドにしても同様です。
values をメソッドにした例
actor Sender {
func values() -> AsyncStream<Int> {
AsyncStream { continuation in
Task {
for i in 1...9 {
try? await Task.sleep(for: .seconds(1))
continuation.yield(i)
}
}
continuation.onTermination = { @Sendable termination in
switch termination {
case .cancelled:
print("cancelled")
case .finished:
print("finished")
}
}
}
}
}
let sender = Sender()
Task {
let taskA = Task {
for await value in await sender.values {
print("A: \(value)")
}
}
try? await Task.sleep(for: .seconds(3))
// 3秒遅延してから`taskB`でも購読開始
let taskB = Task {
for await value in await sender.values {
print("B: \(value)")
}
}
}
/*
A: 1
A: 2
A: 3
B: 1
A: 4
B: 2
A: 5
B: 3
A: 6
B: 4
A: 7
B: 5
A: 8
B: 6
A: 9
finished
B: 7
B: 8
B: 9
finished
*/
AsyncStreamを複数箇所で購読する方法
購読を開始する際に生成される AsyncStream.Continuation と 値を送信するTask をそれぞれ管理することで実現することができます。
以下のコードは一例です。
actor Sender {
private var continuations: [UUID: AsyncStream<Int>.Continuation] = [:]
private var task: Task<Void, Never>?
init() {
task = Task {
for i in 1...9 {
try? await Task.sleep(for: .seconds(1))
for (_, c) in await continuations {
c.yield(i)
}
}
for (_, c) in await continuations {
c.finish()
}
}
}
var values: AsyncStream<Int> {
AsyncStream { continuation in
let id = UUID()
continuations[id] = continuation
continuation.onTermination = { @Sendable termination in
Task {
await self.removeContinuation(for: id)
switch termination {
case .cancelled:
print("cancelled")
case .finished:
print("finished")
}
}
}
}
}
private func removeContinuation(for id: UUID) {
continuations.removeValue(forKey: id)
}
deinit {
task?.cancel()
continuations.removeAll()
}
}
このようにすることで、値が分配されることや、片方のキャンセルにもう片方が影響することを防ぐことができます。
実行結果
let taskA = Task {
for await value in await sender.values {
print("A: \(value)")
}
}
let taskB = Task {
for await value in await sender.values {
print("B: \(value)")
}
}
Task {
try? await Task.sleep(for: .seconds(5))
taskA.cancel()
}
/*
B: 1
A: 1
B: 2
A: 2
B: 3
A: 3
B: 4
A: 4
B: 5
cancelled
B: 6
B: 7
B: 8
B: 9
finished
*/
途中から購読開始しても、同じイベントを受信できるようになります。
実行結果
Task {
let taskA = Task {
for await value in await sender.values {
print("A: \(value)")
}
}
try? await Task.sleep(for: .seconds(3))
let taskB = Task {
for await value in await sender.values {
print("B: \(value)")
}
}
}
/*
A: 1
A: 2
A: 3
B: 3 // <- 途中から購読開始しても `taskA` と同じ値を出力してる
A: 4
B: 4
A: 5
B: 5
A: 6
B: 6
A: 7
B: 7
A: 8
B: 8
A: 9
B: 9
finished
finished
*/
上記の例では Sender 内で 値を送信するTask を管理してますが、例えばNWPathMonitorやApple公式のサンプルで書かれているQuakeMonitorのようなハンドラで値が渡ってくるケースも考え方は同様です。
AsyncStreamのクロージャ内では生成されたAsyncStream.Continuationを保持するだけにして、実際に値が渡されるハンドラ内で保持している各AsyncStream.Continuationで yield を呼んで値を渡すようにすれば良いです。
class Monitor {
var handler: ((Int) -> Void)?
private var count: Int = 0
let timer = AsyncStream {
try? await Task.sleep(for: .seconds(1))
}
func start() {
Task {
for await _ in timer {
handler?(count)
count += 1
}
}
}
func stop() {
handler = nil
}
}
actor Sender {
private let monitor = Monitor()
private var continuations: [UUID: AsyncStream<Int>.Continuation] = [:]
init() {
monitor.handler = { value in
Task {
for (_, c) in await self.continuations {
c.yield(value)
}
}
}
monitor.start()
}
var values: AsyncStream<Int> {
AsyncStream { continuation in
let id = UUID()
continuations[id] = continuation
continuation.onTermination = { @Sendable termination in
Task {
await self.removeContinuation(for: id)
}
}
}
}
private func removeContinuation(for id: UUID) {
continuations.removeValue(forKey: id)
if continuations.isEmpty {
monitor.stop()
}
}
}
iOS 26から
iOS 26からはAsyncStream.ContinuationがHashableに準拠するため、UUIDでの管理が不要となりSetにすることができます。
公式ドキュメント上はiOS 13.0+となっているのですが、 実際にHashableとして利用できるのは iOS 26.0+ となっています。
actor Sender {
private var continuations: Set<AsyncStream<Int>.Continuation> = []
private var task: Task<Void, Never>?
var values: AsyncStream<Int> {
AsyncStream { continuation in
continuations.insert(continuation)
continuation.onTermination = { @Sendable termination in
Task {
await self.remove(continuation)
switch termination {
case .cancelled:
print("cancelled")
case .finished:
print("finished")
}
}
}
}
}
private func remove(_ continuation: AsyncStream<Int>.Continuation) {
continuations.remove(continuation)
}
}
結局のところ?
- 値を送信する側の処理は一度だけ用意
- 購読を開始するたびに
AsyncStreamとAsyncStream.Continuationを生成 - 保持している全ての
AsyncStream.Continuationに対して値を送信する
という流れで複数箇所での購読を実現しています。
例として挙げたコード以外にもさまざまな実装方法があると思いますが、複数箇所での購読を実現する場合基本的にこの3点は変わらないかな、と思います。
また過去にはforumsで、Combineを使ってSubjectをAsyncStreamでラップする方法も紹介されてたりします。
@preconcurrency importを許容できるのであればCombineを使用するのも1つの手だと思います。
(そもそもCombineだと対応してるし...)
最後に
AsyncStreamを複数箇所で購読する方法をまとめました。
正直なところ、冒頭や前述した通りNotificationCenterやCombineを利用した方が素直に実装できるケースも多いと思います。
-
NotificationCenterだと型安全でないのが気になる -
Combineの@preconcurrency importを許容したくない - どうしても
AsyncStreamで実装したい
って人の助けに少しでもなれば幸いです。
追記
swift-async-algorithms 1.1.1からshare()のサポートが入ったことを教えてもらいました!
(@shimastriperさん、ありがとうございます!)
swift-async-algorithms は詳しくないのですが、以下のようなコードで動くこと確認できました。
(全然使い方違うとかあったら指摘ください)
import AsyncAlgorithms
actor Sender {
let values: AsyncStream<Int>
init() {
values = AsyncStream<Int> { continuation in
Task {
for i in 1...9 {
try? await Task.sleep(for: .seconds(1))
continuation.yield(i)
}
continuation.finish()
}
continuation.onTermination = { termination in
switch termination {
case .cancelled:
print("cancelled")
case .finished:
print("finished")
default:
break
}
}
}
}
}
let share = Sender().values.share()
let taskA = Task {
for await v in share {
print("A:", v)
}
}
try? await Task.sleep(for: .seconds(3))
let taskB = Task {
for await v in share {
print("B:", v)
}
}
try? await Task.sleep(for: .seconds(2))
taskA.cancel()
_ = await taskB.result
/*
A: 1
A: 2
A: 3
B: 4
A: 4
B: 5 // <- 5秒時点で `taskA.cancel()` してるため、以降 `taskA` が出力されなくなってる
B: 6
B: 7
B: 8
finished
B: 9
*/
試してみて、自分の書き方が悪いのかどうなのか、taskA.cancel()した時にcontinuation.onTerminationのcancelledが動いてなさそうでした。
参考
Building an Event Broadcaster in Swift with AsyncStream and Actors.
https://medium.com/@shial184686/building-an-event-broadcaster-in-swift-with-asyncstream-and-actors-feeda3f6b6e4
Consuming an AsyncStream from multiple Tasks
https://forums.swift.org/t/consuming-an-asyncstream-from-multiple-tasks/54453
AsyncSequence / Stream version of PassthroughSubject or CurrentValueSubject?
https://forums.swift.org/t/asyncsequence-stream-version-of-passthroughsubject-or-currentvaluesubject/60395
Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Share.md
https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Share.md