CombineからConcurrencyへの移行を進める中で、単純な置き換えができないケースに遭遇しました。
例えば、ユーザのフォロー、アンフォローの機能を担うリポジトリの実装を考えます。
FollowRepositoryはサーバのフォローAPIへのリクエストと、フォロー数をキャッシュする機能を持ちます。
画面上でフォロー操作が行われたら、FollowRepositoryのfollow()
を呼び出します。
follow()
はサーバのフォローAPIをリクエストすると同時に、キャッシュしているフォロー数をインクリメントします。
フォロー数を表示している画面ではlisten()
を呼び出し、フォロー数が変化したという通知を受け取れるようにしていて、通知が来たら画面に反映します。
Combineを使った実装
struct FollowRepository {
private let countSubject = CurrentValueSubject<Int, Never>(0)
func listen() -> AnyPublisher<Int, Never> {
countSubject.eraseToAnyPublisher()
}
func follow() {
// サーバのフォローAPIをリクエスト
// ...
// ローカルキャッシュを更新&通知
countSubject.send(countSubject.value + 1)
}
}
2つの画面でフォロー数を表示していて、2回フォローが実行された後に、1つの画面を閉じてフォロー数の購読をキャンセルしたとします。
最初の2回のフォロー通知はどちらのconsumer
も受け取れていますが、consumer2
は途中で購読をキャンセルしているのでconsumer1
だけが3
と4
の通知を受け取っています。
let repository = FollowRepository()
let cancellable1 = repository.listen().sink {
print("consumer1: \($0)")
}
let cancellable2 = repository.listen().sink {
print("consumer2: \($0)")
}
repository.follow()
repository.follow()
cancellable2.cancel() // 画面2を閉じた
repository.follow()
repository.follow()
// 実行結果
// consumer1: 0
// consumer2: 0
// consumer1: 1
// consumer2: 1
// consumer1: 2
// consumer2: 2
// consumer1: 3
// consumer1: 4
AsyncStreamを使った実装
上記の実装をAsyncStreamで置き換えてみます。
listen()
が呼ばれたときにAsyncStreamが生成されますが、このとき受け取るcontinuation
をプロパティで保持しておく必要があります。
follow()
が呼ばれたときにcontinuation.yield()
を呼び出すことで、AsyncStreamのConsumerに値の更新を通知できるようにするためです。
actor FollowRepository {
private lazy var stream: AsyncStream<Int> = {
AsyncStream { continuation in
self.continuation = continuation
continuation.yield(count)
continuation.onTermination = { _ in }
}
}()
private var continuation: AsyncStream<Int>.Continuation?
private var count = 0
func listen() -> AsyncStream<Int> {
stream
}
func follow() {
// サーバのフォローAPIをリクエスト
// ...
// ローカルキャッシュを更新&通知
count += 1
continuation?.yield(count)
}
}
一見問題なさそうに見えますが、結果は以下のように想定と異なります。
consumer1
は購読直後に初期値である0
を受け取り、その後のフォローの通知はconsumer1
とconsumer2
が交互に受け取っている形です。
さらに、consumer2
のタスクをキャンセルするとストリームがterminated状態になってしまい、購読を継続しているはずのconsumer1
もその後の通知を受け取れなくなっています。
let repository = FollowRepository()
let task1 = Task.detached {
for await i in await repository.listen() {
print("consumer1: \(i)")
}
}
let task2 = Task.detached {
for await i in await repository.listen() {
print("consumer2: \(i)")
}
}
Task.detached {
try await Task.sleep(nanoseconds: 1_000_000_000)
await repository.follow()
try await Task.sleep(nanoseconds: 1_000_000_000)
await repository.follow()
try await Task.sleep(nanoseconds: 1_000_000_000)
await repository.follow()
try await Task.sleep(nanoseconds: 1_000_000_000)
await repository.follow()
task2.cancel()
try await Task.sleep(nanoseconds: 1_000_000_000)
await repository.follow()
try await Task.sleep(nanoseconds: 1_000_000_000)
await repository.follow()
}
// 実行結果
// consumer1: 0
// consumer1: 1
// consumer2: 2
// consumer1: 3
// consumer2: 4
実は、AsyncStreamは複数のConsumerを持てない仕様になっています。
Swiftのフォーラムで関連するスレをいくつか見つけました。
AsyncStreamとCombineを組み合わせた実装
上記スレで、AsyncStreamとCombineと組み合わせることで複数のConsumerに値をブロードキャストする実装を提案してくれてる人がいました。
それを適用したのが以下の実装です。
キャッシュしたい値をCurrentValueSubjectとして保持します。
AsyncStream生成時にこれをsinkし、新しい値を受け取ったらAsyncStreamのストリームに流すようにします。
follow()
が呼ばれたらCurrentValueSubjectに新しい値を通知します。するとAsyncStreamのConsumerに値が通知されるという仕組みです。
listen()
が呼ばれるたびに新しいストリームが生成され、Consumerはそれぞれ別々のストリームを購読します。
それぞれのストリームの内部では同じCurrentValueSubjectのインスタンスを参照しているので、CurrentValueSubjectに新しい値が通知されると全てのConsumerに値がブロードキャストされるというわけです。
actor FollowRepository {
private let countSubject = CurrentValueSubject<Int, Never>(0)
func listen() -> AsyncStream<Int> {
AsyncStream { continuation in
let cancellable = countSubject.sink {
continuation.yield($0)
}
continuation.onTermination = { _ in
cancellable.cancel()
}
}
}
func follow() {
// サーバのフォローAPIをリクエスト
// ...
// ローカルキャッシュを更新&通知
countSubject.send(countSubject.value + 1)
}
}
結果は以下の通りで、最初のCombineのみで実装されたものと同じ結果になりました。
2つのConsumerは購読時に初期値である0
を受け取り、2回フォローが実行されるとどちらも1
と2
を受け取り、consumer2
が購読をキャンセルしたあともconsumer1
は引き続きフォローの通知を受け取ることができています。
let repository = FollowRepository()
let task1 = Task.detached {
for await i in await repository.listen() {
print("consumer1: \(i)")
}
}
let task2 = Task.detached {
for await i in await repository.listen() {
print("consumer2: \(i)")
}
}
Task.detached {
try await Task.sleep(nanoseconds: 1_000_000_000)
await repository.follow()
try await Task.sleep(nanoseconds: 1_000_000_000)
await repository.follow()
task2.cancel()
try await Task.sleep(nanoseconds: 1_000_000_000)
await repository.follow()
try await Task.sleep(nanoseconds: 1_000_000_000)
await repository.follow()
}
// 実行結果
// consumer1: 0
// consumer2: 0
// consumer1: 1
// consumer2: 1
// consumer1: 2
// consumer2: 2
// consumer1: 3
// consumer1: 4
あとがき
Combineから移行したいのに、結局Combineの実装が残るという結果になったのが少し残念ではありますが、クライアントはCombineを意識しなくて済むので一旦良しとします。
AsyncStreamで複数のConsumerが購読したいケースは割と多いと思うのですが、今後このようなケースに対応した新しい型は登場するのでしょうか?