前書き
皆さん、ReactiveSwiftはご存知だろうか?
正直なところ、RxSwiftと比べてシェアは低く、Combineが登場した今、新たに導入することはあまりないと思う。
実際この記事を書いている時点で、QiitaのRxSwiftの記事は562件あるのに対し、ReactiveSwiftの記事は13件しかなく、ReactiveSwiftのシェアの低さを物語っている。
しかし、ReactiveSwiftには他のReactiveProgrammingのライブラリにはない便利なオペレーターが存在する。
今回紹介するthrottle(while:on:)もそのひとつ。
whileに指定したPropertyの値がtrueの間、上流から来る値を堰き止め、falseになったタイミングで最後に流れてきた値を下流に流す。
ReactiveSwiftのオペレーターについては@sei_kataokaさんの記事が非常に分かりやすいのでおすすめだ。
今回はこれをCombineのオペレーターとして再現してみた。
方針
Combineに初めから実装されているオペレータは全てPublishersにPubisherを準拠したstructやclassの形で機能が実装されている。
例えば、よく使われるオペレーターのPublisher.map(_:)は戻り値がPublishers.Mapになっている。
extension Publisher {
func map<T>(_ transform: @escaping (Self.Output) -> T) -> Publishers.Map<Self, T> {
Publishers.Map(upstream: self, transform: transform)
}
}
これはmap(_:)の実際の処理がPublishers.Mapに書かれているためで、実際のコードを見た訳ではないがPublisher.map(_:)では元のPublisherをPublishers.Mapに置き換えているだけだと思われる。
今回はこれと同じくPublishersにextensionで新たにPublisherを追加するやり方でthrottle(while:on:)を追加していく。
Publisherの作成
ThrottleWhile structの作成
まずPublishersをextensionしてPublisherに準拠したThrottleWhileというstructを作成する。
extension Publishers {
public struct ThrottleWhile<Upstream, ShouldThrottle, Context>: Publisher where Upstream: Publisher, ShouldThrottle: Publisher<Bool, Never>, Context: Combine.Scheduler {
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let shouldThrottle: ShouldThrottle
public let initialThrottle: Bool
public let context: Context
public init(upstream: Upstream, shouldThrottle: ShouldThrottle, initialThrottle: Bool, context: Context){
self.upstream = upstream
self.shouldThrottle = shouldThrottle
self.initialThrottle = initialThrottle
self.context = context
}
プロパティは上流のPublisherであるupstream: Upstreamと値をスロットルするためのPublisherであるshouldThrottle: ShouldThrottle、そしてPublisherはReactiveSwiftのPropertyのように初期値を持たないため、代わりの初期値となるinitialThrottle: Boolとスケジューラーであるcontext: Contextだ。
Publisherのtypealiasは、このオペレーターはmap(_:)のように値を変更するものではないため、上流のものをそのまま使う。
receive(subscriber:)関数の実装
次にPublisherのreceive(subscriber:)関数の実装をする。
public func receive<S>(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input {
let inner = Inner(subscriber: subscriber, upstream: self.upstream, shouldThrottle: self.shouldThrottle, initialThrottle: self.initialThrottle, context: self.context)
subscriber.receive(subscription: inner)
}
この関数は新たにsubscriberが接続された時に呼び出されるもので、渡されたSubscrberへSubscriptionを渡す。
今回はSubscriptionに準拠したInner classを作成し、それを渡した。
Innerクラスの作成
InnerクラスはSubscriptionに準拠し、throttle(while:on:)の実際の処理を書く。
private final class Inner<S: Subscriber>: Subscription where S.Input == Upstream.Output, S.Failure == Upstream.Failure {
private let subscriber: S
private let context: Context
private var cancellables = Set<AnyCancellable>()
private var demand: Subscribers.Demand = .none
private var isThrottling: Bool
private var latestValue: Upstream.Output?
init(subscriber: S, upstream: Upstream, shouldThrottle: ShouldThrottle, initialThrottle: Bool, context: Context) {
self.subscriber = subscriber
self.context = context
isThrottling = initialThrottle
shouldThrottle.sink { [weak self] value in
self?.isThrottling = value
if let latestValue = self?.latestValue {
self?.sendValue(latestValue)
}
}
.store(in: &cancellables)
upstream.sink { [weak self] completion in
self?.subscriber.receive(completion: completion)
} receiveValue: { [weak self] value in
self?.latestValue = value
self?.sendValue(value)
}
.store(in: &cancellables)
}
まず、subscriberとその他ThrottleWhileに必要な値をinitで受け取り、各Publisherを購読する。
shouldThrottleの購読ではisThrottlingの値を更新しlatestValueが設定されていればそれをsendValue(_:)する。
upstreamの購読ではcompletionが来たらsubscriberにcompletionを送信し、値が来た時はlatestValueの値を更新しsendValue(_:)する。
sendValue(_:)はプライベートメソッドで、isThrottlingがfalseかつ、demandが残っているときsubscriberに値を送信する。
private func sendValue(_ value: Upstream.Output) {
guard !isThrottling, demand > 0 else { return }
context.schedule { [weak self] in
guard let self else { return }
demand += subscriber.receive(value)
}
}
demandは簡単に言うとsubscriberが値を送って欲しい回数のことであり、Subscriptionで定義されているrequest(_:)で受け取ることができる。
func request(_ demand: Subscribers.Demand) {
guard demand > 0 else { return }
self.demand += demand
}
あとは、残りのSubscriptionが準拠しているCancellableで定義されているcancel()を実装すれば完成だ。
func cancel() {
cancellables.forEach { $0.cancel() }
cancellables.removeAll()
}
オペレーターの追加
最後に、先に作成したPublisherを返却するオペレーターをPublisherに追加する。
今回は二種類のオペレーターを作成した。
一つ目はshouldThrottle, initialThrottle, schedulerを引数に持つオペレーターでThrottleWhileに必要な値をそのまま引数にしたものだ。
extension Publisher {
public func throttle<T, S>(while shouldThrottle: T, initialThrottle: Bool, on scheduler: S) -> Publishers.ThrottleWhile<Self, T, S> where T: Publisher<Bool, Never>, S: Combine.Scheduler {
Publishers.ThrottleWhile(upstream: self, shouldThrottle: shouldThrottle, initialThrottle: initialThrottle, context: scheduler)
}
}
利用例
subject
.throttle(while: throttle, initialThrottle: false, on: ImmediateScheduler.shared)
.sink {
print("value: \($0)")
}
.store(in: &cancellables)
二つ目はshouldThrottleをCurrentValueSubjectに限定することで、initialThrottleの設定を不要にしたものだ。
これを使えばReactiveSwiftのthrottle(while:on:)とほぼ同じ使い方ができる。
extension Publisher {
public func throttle<T, S>(while shouldThrottle: T, on scheduler: S) -> Publishers.ThrottleWhile<Self, T, S> where T: CurrentValueSubject<Bool, Never>, S: Combine.Scheduler {
Publishers.ThrottleWhile(upstream: self, shouldThrottle: shouldThrottle, initialThrottle: shouldThrottle.value, context: scheduler)
}
}
利用例
subject
.throttle(while: throttle, on: ImmediateScheduler.shared)
.sink {
print("value: \($0)")
}
.store(in: &cancellables)
ReactiveSwiftのthrottle(while:on:)の例
producer
.throttle(while: throttle, on: ImmediateScheduler())
.startWithValues {
print("value: \($0)")
}
コード全文
後書き
今回は、CombineでReactiveSwiftのオペレーターを再現してみた。
一般的にオペレーターの追加は既存のオペレーターを利用する形で実装する場合が多いが、今回は公式の実装に倣う形で作成することで、Combineの内部構造について深く理解することができた。
一見するとCombineはプロトコル指向で取っ付きにくいと感じるかもしれないが、実際には拡張性が高く、柔軟に対応できることが分かる。
もし現在Combineを何となく使っていて、内部構造を詳しく知りたければ、オペレーターの作成から始めてみるのが良いだろう。
ところで、既にCombineはObservationへの置き換えが始まっており、ストリーム的な処理もasync-algorithmsで置き換えられるようになっている。
未だSwift6のSendableに対応していないのも含め、Combineが非推奨になる日は近いかもしれない。