前書き
皆さん、ReactiveSwiftはご存知だろうか?
正直なところ、RxSwiftと比べてシェアは低く、Combineが登場した今、新たに導入することはあまりないと思う。
実際この記事を書いている時点で、QiitaのRxSwiftの記事は562件あるのに対し、ReactiveSwiftの記事は13件しかなく、ReactiveSwiftのシェアの低さを物語っている。
しかし、ReactiveSwiftには他のReactiveProgrammingのライブラリにはない便利なオペレーターが存在する。
今回紹介するthrottle(while:on:)
もそのひとつ。
while
に指定したProperty
の値がtrue
の間、上流から来る値を堰き止め、false
になったタイミングで最後に流れてきた値を下流に流す。
ReactiveSwiftのオペレーターについては@sei_kataokaさんの記事が非常に分かりやすいのでおすすめだ。
今回はこれをCombineのオペレーターとして再現してみた。
方針
Combineに初めから実装されているオペレータは全てPublishersにPubisherを準拠したstruct
やclass
の形で機能が実装されている。
例えば、よく使われるオペレーターのPublisher.map(_:)は戻り値がPublishers.Mapになっている。
extension Publisher {
func map<T>(_ transform: @escaping (Self.Output) -> T) -> Publishers.Map<Self, T> {
Publishers.Map(upstream: self, transform: transform)
}
}
これはmap(_:)
の実際の処理がPublishers.Map
に書かれているためで、実際のコードを見た訳ではないがPublisher.map(_:)
では元のPublisher
をPublishers.Map
に置き換えているだけだと思われる。
今回はこれと同じくPublishers
にextension
で新たにPublisher
を追加するやり方でthrottle(while:on:)
を追加していく。
Publisherの作成
ThrottleWhile structの作成
まずPublishers
をextension
してPublisher
に準拠したThrottleWhile
というstruct
を作成する。
extension Publishers {
public struct ThrottleWhile<Upstream, ShouldThrottle, Context>: Publisher where Upstream: Publisher, ShouldThrottle: Publisher<Bool, Never>, Context: Combine.Scheduler {
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let shouldThrottle: ShouldThrottle
public let initialThrottle: Bool
public let context: Context
public init(upstream: Upstream, shouldThrottle: ShouldThrottle, initialThrottle: Bool, context: Context){
self.upstream = upstream
self.shouldThrottle = shouldThrottle
self.initialThrottle = initialThrottle
self.context = context
}
プロパティは上流のPublisher
であるupstream: Upstream
と値をスロットルするためのPublisher
であるshouldThrottle: ShouldThrottle
、そしてPublisher
はReactiveSwiftのProperty
のように初期値を持たないため、代わりの初期値となるinitialThrottle: Bool
とスケジューラーであるcontext: Context
だ。
Publisherのtypealiasは、このオペレーターはmap(_:)
のように値を変更するものではないため、上流のものをそのまま使う。
receive(subscriber:)
関数の実装
次にPublisher
のreceive(subscriber:)
関数の実装をする。
public func receive<S>(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input {
let inner = Inner(subscriber: subscriber, upstream: self.upstream, shouldThrottle: self.shouldThrottle, initialThrottle: self.initialThrottle, context: self.context)
subscriber.receive(subscription: inner)
}
この関数は新たにsubscriber
が接続された時に呼び出されるもので、渡されたSubscrber
へSubscription
を渡す。
今回はSubscription
に準拠したInner
classを作成し、それを渡した。
Inner
クラスの作成
Inner
クラスはSubscription
に準拠し、throttle(while:on:)
の実際の処理を書く。
private final class Inner<S: Subscriber>: Subscription where S.Input == Upstream.Output, S.Failure == Upstream.Failure {
private let subscriber: S
private let context: Context
private var cancellables = Set<AnyCancellable>()
private var demand: Subscribers.Demand = .none
private var isThrottling: Bool
private var latestValue: Upstream.Output?
init(subscriber: S, upstream: Upstream, shouldThrottle: ShouldThrottle, initialThrottle: Bool, context: Context) {
self.subscriber = subscriber
self.context = context
isThrottling = initialThrottle
shouldThrottle.sink { [weak self] value in
self?.isThrottling = value
if let latestValue = self?.latestValue {
self?.sendValue(latestValue)
}
}
.store(in: &cancellables)
upstream.sink { [weak self] completion in
self?.subscriber.receive(completion: completion)
} receiveValue: { [weak self] value in
self?.latestValue = value
self?.sendValue(value)
}
.store(in: &cancellables)
}
まず、subscriber
とその他ThrottleWhile
に必要な値をinit
で受け取り、各Publisher
を購読する。
shouldThrottle
の購読ではisThrottling
の値を更新しlatestValue
が設定されていればそれをsendValue(_:)
する。
upstream
の購読ではcompletion
が来たらsubscriber
にcompletion
を送信し、値が来た時はlatestValue
の値を更新しsendValue(_:)
する。
sendValue(_:)
はプライベートメソッドで、isThrottling
がfalse
かつ、demand
が残っているときsubscriber
に値を送信する。
private func sendValue(_ value: Upstream.Output) {
guard !isThrottling, demand > 0 else { return }
context.schedule { [weak self] in
guard let self else { return }
demand += subscriber.receive(value)
}
}
demand
は簡単に言うとsubscriber
が値を送って欲しい回数のことであり、Subscription
で定義されているrequest(_:)
で受け取ることができる。
func request(_ demand: Subscribers.Demand) {
guard demand > 0 else { return }
self.demand += demand
}
あとは、残りのSubscription
が準拠しているCancellable
で定義されているcancel()
を実装すれば完成だ。
func cancel() {
cancellables.forEach { $0.cancel() }
cancellables.removeAll()
}
オペレーターの追加
最後に、先に作成したPublisher
を返却するオペレーターをPublisher
に追加する。
今回は二種類のオペレーターを作成した。
一つ目はshouldThrottle
, initialThrottle
, scheduler
を引数に持つオペレーターでThrottleWhile
に必要な値をそのまま引数にしたものだ。
extension Publisher {
public func throttle<T, S>(while shouldThrottle: T, initialThrottle: Bool, on scheduler: S) -> Publishers.ThrottleWhile<Self, T, S> where T: Publisher<Bool, Never>, S: Combine.Scheduler {
Publishers.ThrottleWhile(upstream: self, shouldThrottle: shouldThrottle, initialThrottle: initialThrottle, context: scheduler)
}
}
利用例
subject
.throttle(while: throttle, initialThrottle: false, on: ImmediateScheduler.shared)
.sink {
print("value: \($0)")
}
.store(in: &cancellables)
二つ目はshouldThrottle
をCurrentValueSubject
に限定することで、initialThrottle
の設定を不要にしたものだ。
これを使えばReactiveSwiftのthrottle(while:on:)
とほぼ同じ使い方ができる。
extension Publisher {
public func throttle<T, S>(while shouldThrottle: T, on scheduler: S) -> Publishers.ThrottleWhile<Self, T, S> where T: CurrentValueSubject<Bool, Never>, S: Combine.Scheduler {
Publishers.ThrottleWhile(upstream: self, shouldThrottle: shouldThrottle, initialThrottle: shouldThrottle.value, context: scheduler)
}
}
利用例
subject
.throttle(while: throttle, on: ImmediateScheduler.shared)
.sink {
print("value: \($0)")
}
.store(in: &cancellables)
ReactiveSwiftのthrottle(while:on:)
の例
producer
.throttle(while: throttle, on: ImmediateScheduler())
.startWithValues {
print("value: \($0)")
}
コード全文
後書き
今回は、CombineでReactiveSwiftのオペレーターを再現してみた。
一般的にオペレーターの追加は既存のオペレーターを利用する形で実装する場合が多いが、今回は公式の実装に倣う形で作成することで、Combineの内部構造について深く理解することができた。
一見するとCombineはプロトコル指向で取っ付きにくいと感じるかもしれないが、実際には拡張性が高く、柔軟に対応できることが分かる。
もし現在Combineを何となく使っていて、内部構造を詳しく知りたければ、オペレーターの作成から始めてみるのが良いだろう。
ところで、既にCombineはObservationへの置き換えが始まっており、ストリーム的な処理もasync-algorithmsで置き換えられるようになっている。
未だSwift6のSendableに対応していないのも含め、Combineが非推奨になる日は近いかもしれない。