19
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

AsyncStreamで複数のConsumerに値をブロードキャストする

Posted at

CombineからConcurrencyへの移行を進める中で、単純な置き換えができないケースに遭遇しました。

例えば、ユーザのフォロー、アンフォローの機能を担うリポジトリの実装を考えます。

FollowRepositoryはサーバのフォローAPIへのリクエストと、フォロー数をキャッシュする機能を持ちます。
画面上でフォロー操作が行われたら、FollowRepositoryのfollow()を呼び出します。
follow()はサーバのフォローAPIをリクエストすると同時に、キャッシュしているフォロー数をインクリメントします。

フォロー数を表示している画面ではlisten()を呼び出し、フォロー数が変化したという通知を受け取れるようにしていて、通知が来たら画面に反映します。

Combineを使った実装

struct FollowRepository {
    private let countSubject = CurrentValueSubject<Int, Never>(0)

    func listen() -> AnyPublisher<Int, Never> {
        countSubject.eraseToAnyPublisher()
    }

    func follow() {
        // サーバのフォローAPIをリクエスト
        // ...

        // ローカルキャッシュを更新&通知
        countSubject.send(countSubject.value + 1)
    }
}

2つの画面でフォロー数を表示していて、2回フォローが実行された後に、1つの画面を閉じてフォロー数の購読をキャンセルしたとします。
最初の2回のフォロー通知はどちらのconsumerも受け取れていますが、consumer2は途中で購読をキャンセルしているのでconsumer1だけが34の通知を受け取っています。

let repository = FollowRepository()

let cancellable1 = repository.listen().sink {
    print("consumer1: \($0)")
}

let cancellable2 = repository.listen().sink {
    print("consumer2: \($0)")
}

repository.follow()
repository.follow()
cancellable2.cancel()  // 画面2を閉じた
repository.follow()
repository.follow()

// 実行結果
// consumer1: 0
// consumer2: 0
// consumer1: 1
// consumer2: 1
// consumer1: 2
// consumer2: 2
// consumer1: 3
// consumer1: 4

AsyncStreamを使った実装

上記の実装をAsyncStreamで置き換えてみます。

listen()が呼ばれたときにAsyncStreamが生成されますが、このとき受け取るcontinuationをプロパティで保持しておく必要があります。
follow()が呼ばれたときにcontinuation.yield()を呼び出すことで、AsyncStreamのConsumerに値の更新を通知できるようにするためです。

actor FollowRepository {
    private lazy var stream: AsyncStream<Int> = {
        AsyncStream { continuation in
            self.continuation = continuation
            continuation.yield(count)
            continuation.onTermination = { _ in }
        }
    }()

    private var continuation: AsyncStream<Int>.Continuation?
    private var count = 0

    func listen() -> AsyncStream<Int> {
        stream
    }

    func follow() {
        // サーバのフォローAPIをリクエスト
        // ...

        // ローカルキャッシュを更新&通知
        count += 1
        continuation?.yield(count)
    }
}

一見問題なさそうに見えますが、結果は以下のように想定と異なります。
consumer1は購読直後に初期値である0を受け取り、その後のフォローの通知はconsumer1consumer2が交互に受け取っている形です。
さらに、consumer2のタスクをキャンセルするとストリームがterminated状態になってしまい、購読を継続しているはずのconsumer1もその後の通知を受け取れなくなっています。

let repository = FollowRepository()

let task1 = Task.detached {
    for await i in await repository.listen() {
        print("consumer1: \(i)")
    }
}

let task2 = Task.detached {
    for await i in await repository.listen() {
        print("consumer2: \(i)")
    }
}

Task.detached {
    try await Task.sleep(nanoseconds: 1_000_000_000)
    await repository.follow()
    try await Task.sleep(nanoseconds: 1_000_000_000)
    await repository.follow()
    try await Task.sleep(nanoseconds: 1_000_000_000)
    await repository.follow()
    try await Task.sleep(nanoseconds: 1_000_000_000)
    await repository.follow()

    task2.cancel()

    try await Task.sleep(nanoseconds: 1_000_000_000)
    await repository.follow()
    try await Task.sleep(nanoseconds: 1_000_000_000)
    await repository.follow()
}

// 実行結果
// consumer1: 0
// consumer1: 1
// consumer2: 2
// consumer1: 3
// consumer2: 4

実は、AsyncStreamは複数のConsumerを持てない仕様になっています。
Swiftのフォーラムで関連するスレをいくつか見つけました。

AsyncStreamとCombineを組み合わせた実装

上記スレで、AsyncStreamとCombineと組み合わせることで複数のConsumerに値をブロードキャストする実装を提案してくれてる人がいました。

それを適用したのが以下の実装です。
キャッシュしたい値をCurrentValueSubjectとして保持します。
AsyncStream生成時にこれをsinkし、新しい値を受け取ったらAsyncStreamのストリームに流すようにします。
follow()が呼ばれたらCurrentValueSubjectに新しい値を通知します。するとAsyncStreamのConsumerに値が通知されるという仕組みです。

listen()が呼ばれるたびに新しいストリームが生成され、Consumerはそれぞれ別々のストリームを購読します。
それぞれのストリームの内部では同じCurrentValueSubjectのインスタンスを参照しているので、CurrentValueSubjectに新しい値が通知されると全てのConsumerに値がブロードキャストされるというわけです。

actor FollowRepository {
    private let countSubject = CurrentValueSubject<Int, Never>(0)

    func listen() -> AsyncStream<Int> {
        AsyncStream { continuation in
            let cancellable = countSubject.sink {
                continuation.yield($0)
            }
            continuation.onTermination = { _ in
                cancellable.cancel()
            }
        }
    }

    func follow() {
        // サーバのフォローAPIをリクエスト
        // ...

        // ローカルキャッシュを更新&通知
        countSubject.send(countSubject.value + 1)
    }
}

結果は以下の通りで、最初のCombineのみで実装されたものと同じ結果になりました。
2つのConsumerは購読時に初期値である0を受け取り、2回フォローが実行されるとどちらも12を受け取り、consumer2が購読をキャンセルしたあともconsumer1は引き続きフォローの通知を受け取ることができています。

let repository = FollowRepository()

let task1 = Task.detached {
    for await i in await repository.listen() {
        print("consumer1: \(i)")
    }
}

let task2 = Task.detached {
    for await i in await repository.listen() {
        print("consumer2: \(i)")
    }
}

Task.detached {
    try await Task.sleep(nanoseconds: 1_000_000_000)
    await repository.follow()
    try await Task.sleep(nanoseconds: 1_000_000_000)
    await repository.follow()

    task2.cancel()

    try await Task.sleep(nanoseconds: 1_000_000_000)
    await repository.follow()
    try await Task.sleep(nanoseconds: 1_000_000_000)
    await repository.follow()
}

// 実行結果
// consumer1: 0
// consumer2: 0
// consumer1: 1
// consumer2: 1
// consumer1: 2
// consumer2: 2
// consumer1: 3
// consumer1: 4

あとがき

Combineから移行したいのに、結局Combineの実装が残るという結果になったのが少し残念ではありますが、クライアントはCombineを意識しなくて済むので一旦良しとします。

AsyncStreamで複数のConsumerが購読したいケースは割と多いと思うのですが、今後このようなケースに対応した新しい型は登場するのでしょうか?

19
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
19
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?