2
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

posted at

updated at

Combine.Subjectはどう実装すればいいか

Subjectってどう実装すればいいの?

iOS13で導入されたCombine。iOS14が出てiOS12を切るタイミングで導入したいと考えている人は多いのではないでしょうか。

自分も少しずつCombineに取り組むため、まず部品として切り出しやすそうなUserDefaultsをCombineで扱う方法を調べました。
UserDefaultsは値の読み書き両方をこなうので Subject として実装するのが良さそうです。
しかしドキュメントを見ても実装方法が正直よくわかりません……

というわけで、試行錯誤してSubjectを実装して学んだことを書くことにしました。

注意

残念ながら本当に正解の実装か(Combineとして推奨される実装か)はわかりません。
この記事ではCurrentValueSubjectを調査し、同じ動作をすれば正解とみなします。

Subjectとはなにか

まずSubjectはなにか確認しましょう。SubjectはPublisherを継承しています。

Publisherは Publisherの内部で値を発生させ、通知するもの と考えると良いと思います。
例えばPublisherに準拠したAnyPublisher<Int, Error>の場合、AnyPublisherの内部でIntまたはError(とCompletion.finish)を発生させ、外部に通知することができます。

ではSubjectは何かというと Subjectの外部から値を受け取り、通知するもの となります。
例えばSubjectに準拠したCurrentValueSubject<Int, Error>の場合、subject.send(1)のように外部から値を渡し、それを通知することができます。
もちろんPublisherでもあるため、内部で値を発生させる可能性もあります。(CurrentValueSubjectは多分内部で値を発生させない)

簡易版CurrentValueSubjectクローン

CurrentValueSubjectと同じような動作をするMySubjectを作ります。
まずは簡易版として以下だけ考慮することにします。

  • sink()で値を受け取れる
  • cancel()が可能

実装したコード全体はこちらでご確認ください。
いくつかポイントだけ解説します。

Publisherの機能を実装

func receive<S>(
    subscriber: S
) where S : Combine.Subscriber, Failure == S.Failure, Output == S.Input {
    let subscription = MySubjectSubscription(subscriber: subscriber, cancel: cancel(subscription:))
    subscriber.receive(subscription: subscription)
    subscriptions.append(subscription)

    subscription.receive(value)
}

protocol Publisherとして必要なメソッドです。 sink()すると呼ばれます。
sink()でこれが呼ばれるというのは少し分かりにくいですが、以下のふたつが同じ内容だと考えればピンとくると思います。

let cancelable = subject.sink(receiveCompletion: { (_) in },
                              receiveValue: { (_) in })
let sink = Subscribers.Sink<Int, Error>(receiveCompletion: { (_) in },
                                        receiveValue: { (_) in })
subject.receive(subscriber: sink)
let cancelable = AnyCancellable(sink)

Publisherの func receive<S>() で必要なことは以下の2つ(+おまけ)

  • Publisher(self)とSubscriber(引数subscriber)の繋がりを表すSubscriptionを生成する
  • SubscriberにSubscriptionを渡す
  • (おまけ)今回はCurrentValueSubjectのコピーなので現在の値を即座に通知する

Subscriptionを実装

Subscriptionで重要なことは cancel()を受け取ることです。

  • AnyCancellableのオブジェクトが消えたとき
  • AnyCancellable.cancel()が呼ばれたとき

にSubscriptionのcancel()が呼ばれます。
今回の実装例ではSubscriptionからPublisher(Subject)にcallbackして、Publisherが保持するSubscriptionを消すようにしました。

ちなみにAnyCancellable.cancel()のあとAnyCancellableが消えても、Subscriptionのcancel()は1回しか呼ばれません。

CurrentValueSubjectの動作を確認する

自分の当初の目的はUserDefaultsをCombineで扱うことでした。
value のdidSetでUserDefaultsの値を更新するだけで十分使えそうですが、Subjectのfunc send(subscription: Subscription)が空っぽだったりと不完全さは拭えません。

しかしドキュメントを読んでも、実装しないと困るか、実装するならどうすればいいか、いまいちわからない。

そこで今回はCurrentValueSubjectの動作を確認しました。
MySubjectがCurrentValueSubjectと同じように動作すれば、Subjectを正しく実装できたと考えてもいいでしょう。

CurrentValueSubjectの動作で個人的にわかりにくいと思ったものを調査したPlaygroundで実行できるコードがこちらです。
以下、一部は記事内でも確認していきます。

completionすると値は流れない

let subject: CurrentValueSubject<Int, Error> = .init(1)

let cancelable = subject.sink(receiveCompletion: { (completion) in
    completion
    return
}, receiveValue: { (value) in
    // 1回だけ実行される
    // completionの後は呼ばれない
    value
    return
})

subject.send(completion: .finished)
// completionしていてもvalueは取れる
subject.value // => 1

subject.send(2)
// completionしているとsendは無視される
subject.value // => 1

subject.value = 3
// completionしていてもvalueは有効。
// valueはCombine.SubjectではなくCurrentValueSubjectの機能だから?
subject.value // => 3

completionさせると値が通知されなくなります。

ただvar value: Outputは読み書きできるようです。var value: OutputはSubjectではなくCurrentValueSubjectのプロパティなので関係なく動くのでしょう。

なおここから先はsend()var value: Outputが更新されれば通知されたものとして説明していきます(sinkのコールバックが何回呼ばれるか、などの説明は書きにくいため)。

SubjectはPublisherをsubscribeできる

let subject1: CurrentValueSubject<Int, Error> = .init(1)
let subject2: CurrentValueSubject<Int, Error> = .init(100)

subject1.value // => 1
subject2.value // => 100

let cancelable = subject1.subscribe(subject2)
// currentValueSubjectはsubscribeした瞬間現在の値が流れるので、subject1,2は同じ値になる
subject1.value // => 1
subject2.value // => 1

subject1.send(2)
// 上流であるsubject1に値を流すとsubject2に値が流れる
subject1.value // => 2
subject2.value // => 2

subject2.send(200)
// 下流であるsubject2に値を流してもsubject1には影響しない
subject1.value // => 2
subject2.value // => 200

subject1.subscribe(subject2)とするとsubject1で発生した値がsubject2に反映されるようになります。

上流をcompletionするとSubjectもcompletionする

let subject1: CurrentValueSubject<Int, Error> = .init(1)
let subject2: CurrentValueSubject<Int, Error> = .init(100)

let cancelable = subject1.subscribe(subject2)
subject1.value // => 1
subject2.value // => 1

subject1.send(completion: .finished)
// subject1を完了するとsubject2も完了する
// subject2へのsendが無効になる
subject2.send(200)
subject1.value // => 1
subject2.value // => 1

上流が完了すると、下流のSubjectも完了します。
上記例ではsubject2が完了するためsubject2.send(200)が機能していません。

completionすれば値を受け取らない

let subject1: CurrentValueSubject<Int, Error> = .init(1)
let subject2: CurrentValueSubject<Int, Error> = .init(100)

let cancellable = subject1.subscribe(subject2)
subject1.value // => 1
subject2.value // => 1

subject2.send(completion: .finished)
subject1.send(2)
// subject2が完了しているので、subject1からsubject2に値を流す設定でもsubject2には影響しない
subject1.value // => 2
subject2.value // => 1

Subjectをcompletionさせると、上流から値を受け取らなくなります。

完全版CurrentValueSubjectクローン

動作確認の結果をもとにCurrentValueSubjectと同じ動作をするMySubjectを作ります(本当に完全かは怪しい)。
Gistにもあげています

Subject.swift
import Combine

class MySubject<Output, Failure : Error> : Combine.Subject {
    var value: Output {
        didSet {
            // 完了していなければ値を通知する
            guard completion == nil else { return }
            subscriptions.forEach { $0.receive(value) }
        }
    }
    private var completion: Subscribers.Completion<Failure>? {
        didSet {
            // 完了を2回送らないように注意
            guard oldValue == nil, let completion = completion else { return }
            subscriptions.forEach { $0.receive(completion: completion) }
        }
    }
    private var subscriptions: [MySubjectSubscription<Output, Failure>] = []

    init(_ value: Output) {
        self.value = value
    }

    // Publisher
    func receive<S>(
        subscriber: S
    ) where S : Combine.Subscriber, Failure == S.Failure, Output == S.Input {
        let subscription = MySubjectSubscription(subscriber: subscriber, cancel: cancel(subscription:))
        subscriber.receive(subscription: subscription)
        subscriptions.append(subscription)

        if let completion = completion {
            // 完了していれば即時通知
            subscription.receive(completion: completion)
        } else {
            // CurrentValueSubjectは値を保持しているので、完了していなければ即時通知
            subscription.receive(value)
        }
    }

    // Subject
    func send(_ value: Output) {
        guard completion == nil else { return }
        self.value = value
    }
    func send(completion: Subscribers.Completion<Failure>) {
        // 完了しているSubjectをsubscribeした場合、即時完了通知する必要があるため、completionは取っておく。
        self.completion = completion
    }
    func send(subscription: Combine.Subscription) {
        // 値が3個でいいなど決めることができないので、無限に要求する
        subscription.request(.unlimited)
    }

    // for Subscription
    private func cancel(subscription: MySubjectSubscription<Output, Failure>) {
        guard let index = subscriptions.firstIndex(of: subscription) else { return }
        subscriptions.remove(at: index)
    }
}
Subscription.swift
class MySubjectSubscription<Output, Failure : Error> : Combine.Subscription, Equatable {
    private var demand: Subscribers.Demand = .none
    private var subscriber: AnySubscriber<Output, Failure>
    private var cancelSubscription: (MySubjectSubscription<Output, Failure>) -> Void

    init<S>(
        subscriber: S,
        cancel: @escaping (MySubjectSubscription<Output, Failure>) -> Void
    ) where S : Combine.Subscriber, Output == S.Input, Failure == S.Failure {
        self.subscriber = .init(subscriber)
        self.cancelSubscription = cancel
    }

    // Subscription
    func request(_ demand: Subscribers.Demand) {
        // request(_:)が複数回呼ばれることがあるか不明。
        // self.demand += demand とどちらがいいかよくわからず。
        self.demand = demand
    }

    // Cancellable
    func cancel() {
        cancelSubscription(self)
    }

    // for Subscriber
    func receive(_ value: Output) {
        guard demand != .none else { return }
        demand -= 1 // ひとつ通知するからひとつ減らす
        demand += subscriber.receive(value) // 追加で要求された数をdemandに足しておく
    }
    func receive(completion: Subscribers.Completion<Failure>) {
        subscriber.receive(completion: completion)
    }

    static func == (lhs: MySubjectSubscription<Output, Failure>, rhs: MySubjectSubscription<Output, Failure>) -> Bool {
        lhs.combineIdentifier == rhs.combineIdentifier
    }
}

ポイントだけ解説します。

subscribe(_ subject: S)に対応する

func send(subscription: Combine.Subscription)は以下の処理を実行すると呼ばれます。

let publisher = Publishers.Sequence<[Int], Error>(sequence: [0, 1, 2])
let ms: MySubject<Int, Error> = .init(1)

publisher.subscribe(ms)

PublisherからSubjectに値を流すためのsubscriptionが引数で渡されるので、subscriptionに対していくつ値を送って欲しいかを要求します。

まとめ

Combine.Subjectを少し丁寧に実装しました。
completionも正確に機能しますし、subscribe(_ subject: S)も正常に動作するので以下のような処理も動きます。

    let subject1: MySubject<Int, Error> = .init(1)
    let subject2: MySubject <Int, Error> = .init(100)

    let cancellable = all.filter(isSugoi(_:)).subscribe(sugoi)

これをベースにUserDefaultsを扱う機能を追加すれば、UserDefaultsをCombineで扱うことができそうです。

おまけ

Publisherを実装するときprint(#function)すると

extension Publisher {
    public func print(_ prefix: String = "", to stream: TextOutputStream? = nil) -> Publishers.Print<Self>
}

が呼ばれて動いていないと勘違いするので注意しましょう😭

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
2
Help us understand the problem. What are the problem?