Subjectってどう実装すればいいの?
iOS13で導入されたCombine。iOS14が出てiOS12を切るタイミングで導入したいと考えている人は多いのではないでしょうか。
自分も少しずつCombineに取り組むため、まず部品として切り出しやすそうなUserDefaultsをCombineで扱う方法を調べました。
UserDefaultsは値の読み書き両方をこなうので Subject
として実装するのが良さそうです。
しかしドキュメントを見ても実装方法が正直よくわかりません……
というわけで、試行錯誤してSubject
を実装して学んだことを書くことにしました。
注意
残念ながら本当に正解の実装か(Combineとして推奨される実装か)はわかりません。
この記事ではCurrentValueSubjectを調査し、同じ動作をすれば正解とみなします。
Subjectとはなにか
[まずSubjectはなにか確認しましょう](Subject - Combine | Apple Developer Documentation)。SubjectはPublisherを継承しています。
Publisherは Publisherの内部で値を発生させ、通知するもの と考えると良いと思います。
例えばPublisherに準拠したAnyPublisher<Int, Error>
の場合、AnyPublisherの内部でIntまたはError(とCompletion.finish)を発生させ、外部に通知することができます。
ではSubjectは何かというと Subjectの外部から値を受け取り、通知するもの となります。
例えばSubjectに準拠したCurrentValueSubject<Int, Error>
の場合、subject.send(1)
のように外部から値を渡し、それを通知することができます。
もちろんPublisherでもあるため、内部で値を発生させる可能性もあります。(CurrentValueSubjectは多分内部で値を発生させない)
簡易版CurrentValueSubjectクローン
CurrentValueSubjectと同じような動作をするMySubjectを作ります。
まずは簡易版として以下だけ考慮することにします。
-
sink()
で値を受け取れる -
cancel()
が可能
[実装したコード全体はこちらでご確認ください。](Combine.Subjectの簡易で不完全な実装 · GitHub)
いくつかポイントだけ解説します。
Publisherの機能を実装
func receive<S>(
subscriber: S
) where S : Combine.Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = MySubjectSubscription(subscriber: subscriber, cancel: cancel(subscription:))
subscriber.receive(subscription: subscription)
subscriptions.append(subscription)
subscription.receive(value)
}
protocol Publisher
として必要なメソッドです。 sink()
すると呼ばれます。
sink()
でこれが呼ばれるというのは少し分かりにくいですが、以下のふたつが同じ内容だと考えればピンとくると思います。
let cancelable = subject.sink(receiveCompletion: { (_) in },
receiveValue: { (_) in })
let sink = Subscribers.Sink<Int, Error>(receiveCompletion: { (_) in },
receiveValue: { (_) in })
subject.receive(subscriber: sink)
let cancelable = AnyCancellable(sink)
Publisherの func receive<S>()
で必要なことは以下の2つ(+おまけ)
- Publisher(self)とSubscriber(引数subscriber)の繋がりを表すSubscriptionを生成する
- SubscriberにSubscriptionを渡す
- (おまけ)今回はCurrentValueSubjectのコピーなので現在の値を即座に通知する
Subscriptionを実装
Subscriptionで重要なことは cancel()
を受け取ることです。
- AnyCancellableのオブジェクトが消えたとき
- AnyCancellable.cancel()が呼ばれたとき
にSubscriptionのcancel()
が呼ばれます。
今回の実装例ではSubscriptionからPublisher(Subject)にcallbackして、Publisherが保持するSubscriptionを消すようにしました。
ちなみにAnyCancellable.cancel()のあとAnyCancellableが消えても、Subscriptionのcancel()
は1回しか呼ばれません。
CurrentValueSubjectの動作を確認する
自分の当初の目的はUserDefaultsをCombineで扱うことでした。
value
のdidSetでUserDefaultsの値を更新するだけで十分使えそうですが、Subjectのfunc send(subscription: Subscription)
が空っぽだったりと不完全さは拭えません。
しかしドキュメントを読んでも、実装しないと困るか、実装するならどうすればいいか、いまいちわからない。
そこで今回はCurrentValueSubjectの動作を確認しました。
MySubjectがCurrentValueSubjectと同じように動作すれば、Subjectを正しく実装できたと考えてもいいでしょう。
CurrentValueSubjectの動作で個人的にわかりにくいと思ったものを調査した[Playgroundで実行できるコード](CurrentValueSubjectの詳細な動作を確認する · GitHub)がこちらです。
以下、一部は記事内でも確認していきます。
completionすると値は流れない
let subject: CurrentValueSubject<Int, Error> = .init(1)
let cancelable = subject.sink(receiveCompletion: { (completion) in
completion
return
}, receiveValue: { (value) in
// 1回だけ実行される
// completionの後は呼ばれない
value
return
})
subject.send(completion: .finished)
// completionしていてもvalueは取れる
subject.value // => 1
subject.send(2)
// completionしているとsendは無視される
subject.value // => 1
subject.value = 3
// completionしていてもvalueは有効。
// valueはCombine.SubjectではなくCurrentValueSubjectの機能だから?
subject.value // => 3
completionさせると値が通知されなくなります。
ただvar value: Output
は読み書きできるようです。var value: Output
はSubjectではなくCurrentValueSubjectのプロパティなので関係なく動くのでしょう。
なおここから先はsend()
でvar value: Output
が更新されれば通知されたものとして説明していきます(sinkのコールバックが何回呼ばれるか、などの説明は書きにくいため)。
SubjectはPublisherをsubscribeできる
let subject1: CurrentValueSubject<Int, Error> = .init(1)
let subject2: CurrentValueSubject<Int, Error> = .init(100)
subject1.value // => 1
subject2.value // => 100
let cancelable = subject1.subscribe(subject2)
// currentValueSubjectはsubscribeした瞬間現在の値が流れるので、subject1,2は同じ値になる
subject1.value // => 1
subject2.value // => 1
subject1.send(2)
// 上流であるsubject1に値を流すとsubject2に値が流れる
subject1.value // => 2
subject2.value // => 2
subject2.send(200)
// 下流であるsubject2に値を流してもsubject1には影響しない
subject1.value // => 2
subject2.value // => 200
subject1.subscribe(subject2)
とするとsubject1で発生した値がsubject2に反映されるようになります。
上流をcompletionするとSubjectもcompletionする
let subject1: CurrentValueSubject<Int, Error> = .init(1)
let subject2: CurrentValueSubject<Int, Error> = .init(100)
let cancelable = subject1.subscribe(subject2)
subject1.value // => 1
subject2.value // => 1
subject1.send(completion: .finished)
// subject1を完了するとsubject2も完了する
// subject2へのsendが無効になる
subject2.send(200)
subject1.value // => 1
subject2.value // => 1
上流が完了すると、下流のSubjectも完了します。
上記例ではsubject2が完了するためsubject2.send(200)
が機能していません。
completionすれば値を受け取らない
let subject1: CurrentValueSubject<Int, Error> = .init(1)
let subject2: CurrentValueSubject<Int, Error> = .init(100)
let cancellable = subject1.subscribe(subject2)
subject1.value // => 1
subject2.value // => 1
subject2.send(completion: .finished)
subject1.send(2)
// subject2が完了しているので、subject1からsubject2に値を流す設定でもsubject2には影響しない
subject1.value // => 2
subject2.value // => 1
Subjectをcompletionさせると、上流から値を受け取らなくなります。
完全版CurrentValueSubjectクローン
動作確認の結果をもとにCurrentValueSubjectと同じ動作をするMySubjectを作ります(本当に完全かは怪しい)。
Gistにもあげています。
import Combine
class MySubject<Output, Failure : Error> : Combine.Subject {
var value: Output {
didSet {
// 完了していなければ値を通知する
guard completion == nil else { return }
subscriptions.forEach { $0.receive(value) }
}
}
private var completion: Subscribers.Completion<Failure>? {
didSet {
// 完了を2回送らないように注意
guard oldValue == nil, let completion = completion else { return }
subscriptions.forEach { $0.receive(completion: completion) }
}
}
private var subscriptions: [MySubjectSubscription<Output, Failure>] = []
init(_ value: Output) {
self.value = value
}
// Publisher
func receive<S>(
subscriber: S
) where S : Combine.Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = MySubjectSubscription(subscriber: subscriber, cancel: cancel(subscription:))
subscriber.receive(subscription: subscription)
subscriptions.append(subscription)
if let completion = completion {
// 完了していれば即時通知
subscription.receive(completion: completion)
} else {
// CurrentValueSubjectは値を保持しているので、完了していなければ即時通知
subscription.receive(value)
}
}
// Subject
func send(_ value: Output) {
guard completion == nil else { return }
self.value = value
}
func send(completion: Subscribers.Completion<Failure>) {
// 完了しているSubjectをsubscribeした場合、即時完了通知する必要があるため、completionは取っておく。
self.completion = completion
}
func send(subscription: Combine.Subscription) {
// 値が3個でいいなど決めることができないので、無限に要求する
subscription.request(.unlimited)
}
// for Subscription
private func cancel(subscription: MySubjectSubscription<Output, Failure>) {
guard let index = subscriptions.firstIndex(of: subscription) else { return }
subscriptions.remove(at: index)
}
}
class MySubjectSubscription<Output, Failure : Error> : Combine.Subscription, Equatable {
private var demand: Subscribers.Demand = .none
private var subscriber: AnySubscriber<Output, Failure>
private var cancelSubscription: (MySubjectSubscription<Output, Failure>) -> Void
init<S>(
subscriber: S,
cancel: @escaping (MySubjectSubscription<Output, Failure>) -> Void
) where S : Combine.Subscriber, Output == S.Input, Failure == S.Failure {
self.subscriber = .init(subscriber)
self.cancelSubscription = cancel
}
// Subscription
func request(_ demand: Subscribers.Demand) {
// request(_:)が複数回呼ばれることがあるか不明。
// self.demand += demand とどちらがいいかよくわからず。
self.demand = demand
}
// Cancellable
func cancel() {
cancelSubscription(self)
}
// for Subscriber
func receive(_ value: Output) {
guard demand != .none else { return }
demand -= 1 // ひとつ通知するからひとつ減らす
demand += subscriber.receive(value) // 追加で要求された数をdemandに足しておく
}
func receive(completion: Subscribers.Completion<Failure>) {
subscriber.receive(completion: completion)
}
static func == (lhs: MySubjectSubscription<Output, Failure>, rhs: MySubjectSubscription<Output, Failure>) -> Bool {
lhs.combineIdentifier == rhs.combineIdentifier
}
}
ポイントだけ解説します。
subscribe(_ subject: S)
に対応する
func send(subscription: Combine.Subscription)
は以下の処理を実行すると呼ばれます。
let publisher = Publishers.Sequence<[Int], Error>(sequence: [0, 1, 2])
let ms: MySubject<Int, Error> = .init(1)
publisher.subscribe(ms)
PublisherからSubjectに値を流すためのsubscriptionが引数で渡されるので、subscriptionに対していくつ値を送って欲しいかを要求します。
まとめ
Combine.Subjectを少し丁寧に実装しました。
completionも正確に機能しますし、subscribe(_ subject: S)
も正常に動作するので以下のような処理も動きます。
let subject1: MySubject<Int, Error> = .init(1)
let subject2: MySubject <Int, Error> = .init(100)
let cancellable = all.filter(isSugoi(_:)).subscribe(sugoi)
これをベースにUserDefaultsを扱う機能を追加すれば、UserDefaultsをCombineで扱うことができそうです。
おまけ
Publisher
を実装するときprint(#function)
すると
extension Publisher {
public func print(_ prefix: String = "", to stream: TextOutputStream? = nil) -> Publishers.Print<Self>
}
が呼ばれて動いていないと勘違いするので注意しましょう😭