0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

前書き

皆さん、ReactiveSwiftはご存知だろうか?

正直なところ、RxSwiftと比べてシェアは低く、Combineが登場した今、新たに導入することはあまりないと思う。

実際この記事を書いている時点で、QiitaのRxSwiftの記事は562件あるのに対し、ReactiveSwiftの記事は13件しかなく、ReactiveSwiftのシェアの低さを物語っている。

しかし、ReactiveSwiftには他のReactiveProgrammingのライブラリにはない便利なオペレーターが存在する。

今回紹介するthrottle(while:on:)もそのひとつ。

whileに指定したPropertyの値がtrueの間、上流から来る値を堰き止め、falseになったタイミングで最後に流れてきた値を下流に流す。

ReactiveSwiftのオペレーターについては@sei_kataokaさんの記事が非常に分かりやすいのでおすすめだ。

今回はこれをCombineのオペレーターとして再現してみた。

方針

Combineに初めから実装されているオペレータは全てPublishersPubisherを準拠したstructclassの形で機能が実装されている。

例えば、よく使われるオペレーターのPublisher.map(_:)は戻り値がPublishers.Mapになっている。

extension Publisher {
    func map<T>(_ transform: @escaping (Self.Output) -> T) -> Publishers.Map<Self, T> {
        Publishers.Map(upstream: self, transform: transform)
    }
}

これはmap(_:)の実際の処理がPublishers.Mapに書かれているためで、実際のコードを見た訳ではないがPublisher.map(_:)では元のPublisherPublishers.Mapに置き換えているだけだと思われる。

今回はこれと同じくPublishersextensionで新たにPublisherを追加するやり方でthrottle(while:on:)を追加していく。

Publisherの作成

ThrottleWhile structの作成

まずPublishersextensionしてPublisherに準拠したThrottleWhileというstructを作成する。

extension Publishers {
    public struct ThrottleWhile<Upstream, ShouldThrottle, Context>: Publisher where Upstream: Publisher, ShouldThrottle: Publisher<Bool, Never>, Context: Combine.Scheduler {
        public typealias Output = Upstream.Output
        public typealias Failure = Upstream.Failure

        public let upstream: Upstream
        public let shouldThrottle: ShouldThrottle
        public let initialThrottle: Bool
        public let context: Context

        public init(upstream: Upstream, shouldThrottle: ShouldThrottle, initialThrottle: Bool, context: Context){
            self.upstream = upstream
            self.shouldThrottle = shouldThrottle
            self.initialThrottle = initialThrottle
            self.context = context
        }

プロパティは上流のPublisherであるupstream: Upstreamと値をスロットルするためのPublisherであるshouldThrottle: ShouldThrottle、そしてPublisherはReactiveSwiftのPropertyのように初期値を持たないため、代わりの初期値となるinitialThrottle: Boolとスケジューラーであるcontext: Contextだ。

Publisherのtypealiasは、このオペレーターはmap(_:)のように値を変更するものではないため、上流のものをそのまま使う。

receive(subscriber:)関数の実装

次にPublisherreceive(subscriber:)関数の実装をする。

        public func receive<S>(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input {
            let inner = Inner(subscriber: subscriber, upstream: self.upstream, shouldThrottle: self.shouldThrottle, initialThrottle: self.initialThrottle, context: self.context)
            subscriber.receive(subscription: inner)
        }

この関数は新たにsubscriberが接続された時に呼び出されるもので、渡されたSubscrberSubscriptionを渡す。
今回はSubscriptionに準拠したInner classを作成し、それを渡した。

Innerクラスの作成

InnerクラスはSubscriptionに準拠し、throttle(while:on:)の実際の処理を書く。

        private final class Inner<S: Subscriber>: Subscription where S.Input == Upstream.Output, S.Failure == Upstream.Failure {
            private let subscriber: S
            private let context: Context

            private var cancellables = Set<AnyCancellable>()

            private var demand: Subscribers.Demand = .none

            private var isThrottling: Bool
            private var latestValue: Upstream.Output?

            init(subscriber: S, upstream: Upstream, shouldThrottle: ShouldThrottle, initialThrottle: Bool, context: Context) {
                self.subscriber = subscriber
                self.context = context

                isThrottling = initialThrottle

                shouldThrottle.sink { [weak self] value in
                    self?.isThrottling = value
                    if let latestValue = self?.latestValue {
                        self?.sendValue(latestValue)
                    }
                }
                .store(in: &cancellables)

                upstream.sink { [weak self] completion in
                    self?.subscriber.receive(completion: completion)
                } receiveValue: { [weak self] value in
                    self?.latestValue = value
                    self?.sendValue(value)
                }
                .store(in: &cancellables)
            }

まず、subscriberとその他ThrottleWhileに必要な値をinitで受け取り、各Publisherを購読する。

shouldThrottleの購読ではisThrottlingの値を更新しlatestValueが設定されていればそれをsendValue(_:)する。

upstreamの購読ではcompletionが来たらsubscribercompletionを送信し、値が来た時はlatestValueの値を更新しsendValue(_:)する。

sendValue(_:)はプライベートメソッドで、isThrottlingfalseかつ、demandが残っているときsubscriberに値を送信する。

            private func sendValue(_ value: Upstream.Output) {
                guard !isThrottling, demand > 0 else { return }
                context.schedule { [weak self] in
                    guard let self else { return }
                    demand += subscriber.receive(value)
                }
            }

demandは簡単に言うとsubscriberが値を送って欲しい回数のことであり、Subscriptionで定義されているrequest(_:)で受け取ることができる。

            func request(_ demand: Subscribers.Demand) {
                guard demand > 0 else { return }
                self.demand += demand
            }

あとは、残りのSubscriptionが準拠しているCancellableで定義されているcancel()を実装すれば完成だ。

            func cancel() {
                cancellables.forEach { $0.cancel() }
                cancellables.removeAll()
            }

オペレーターの追加

最後に、先に作成したPublisherを返却するオペレーターをPublisherに追加する。

今回は二種類のオペレーターを作成した。

一つ目はshouldThrottle, initialThrottle, schedulerを引数に持つオペレーターでThrottleWhileに必要な値をそのまま引数にしたものだ。

extension Publisher {
    public func throttle<T, S>(while shouldThrottle: T, initialThrottle: Bool, on scheduler: S) -> Publishers.ThrottleWhile<Self, T, S> where T: Publisher<Bool, Never>, S: Combine.Scheduler {
        Publishers.ThrottleWhile(upstream: self, shouldThrottle: shouldThrottle, initialThrottle: initialThrottle, context: scheduler)
    }
}

利用例

subject
    .throttle(while: throttle, initialThrottle: false,  on: ImmediateScheduler.shared)
    .sink {
        print("value: \($0)")
    }
    .store(in: &cancellables)

二つ目はshouldThrottleCurrentValueSubjectに限定することで、initialThrottleの設定を不要にしたものだ。

これを使えばReactiveSwiftのthrottle(while:on:)とほぼ同じ使い方ができる。

extension Publisher {
    public func throttle<T, S>(while shouldThrottle: T, on scheduler: S) -> Publishers.ThrottleWhile<Self, T, S> where T: CurrentValueSubject<Bool, Never>, S: Combine.Scheduler {
        Publishers.ThrottleWhile(upstream: self, shouldThrottle: shouldThrottle, initialThrottle: shouldThrottle.value, context: scheduler)
    }
}

利用例

subject
    .throttle(while: throttle, on: ImmediateScheduler.shared)
    .sink {
        print("value: \($0)")
    }
    .store(in: &cancellables)

ReactiveSwiftのthrottle(while:on:)の例

producer
    .throttle(while: throttle, on: ImmediateScheduler())
    .startWithValues {
        print("value: \($0)")
   }

コード全文

後書き

今回は、CombineでReactiveSwiftのオペレーターを再現してみた。

一般的にオペレーターの追加は既存のオペレーターを利用する形で実装する場合が多いが、今回は公式の実装に倣う形で作成することで、Combineの内部構造について深く理解することができた。

一見するとCombineはプロトコル指向で取っ付きにくいと感じるかもしれないが、実際には拡張性が高く、柔軟に対応できることが分かる。

もし現在Combineを何となく使っていて、内部構造を詳しく知りたければ、オペレーターの作成から始めてみるのが良いだろう。

ところで、既にCombineはObservationへの置き換えが始まっており、ストリーム的な処理もasync-algorithmsで置き換えられるようになっている。
未だSwift6のSendableに対応していないのも含め、Combineが非推奨になる日は近いかもしれない。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?