CombineのPublisherにignoreNilを実装するを投稿したら
同じもの作ってたw
— YuTa (@yuta24) December 12, 2019
あと、materializeとdistinctUntilChangedを作りたい。 https://t.co/bXoWlcicTD
とのコメントがありました。distinctUntilChangedはCombineではremoveDuplicatesですがmaterializeは存在しません(よね?)。
ということでmaterializeっぽいものを作ってみます。
extension Publishers {
public enum Event<T, E: Error> {
case value(T)
case failure(Error)
case finished
}
public class Materialized<Upstream: Publisher>: Publisher {
public typealias Output = Event<Upstream.Output, Upstream.Failure>
public typealias Failure = Never
private let upstream: Upstream
init(upstream: Upstream) {
self.upstream = upstream
}
public func receive<S>(subscriber: S) where S : Subscriber, Materialized.Failure == S.Failure, Materialized.Output == S.Input {
let cancellable = upstream.sink(receiveCompletion: { completion in
switch completion {
case .failure(let error):
subscriber.receive(.failure(error))
case .finished:
subscriber.receive(.finished)
}
subscriber.receive(completion: .finished)
}) { output in
_ = subscriber.receive(.value(output))
}
let subscription = MaterializedSubscription(cancellable: cancellable)
subscriber.receive(subscription: subscription)
}
private class MaterializedSubscription: Subscription {
private let cancellable: AnyCancellable
init(cancellable: AnyCancellable) {
self.cancellable = cancellable
}
func request(_ demand: Subscribers.Demand) {}
func cancel() {
cancellable.cancel()
}
}
}
}
extension Publisher {
func materialize() -> Publishers.Materialized<Self> {
Publishers.Materialized(upstream: self)
}
}
こんな感じでしょうか。MaterializedはinitでUpstreamを引数にとります。
receiveメソッドではupstreamをsinkして監視し、subscriberにイベントを送出します。
MaterializedSubscriptionが重要で、upstearmのsinkから返るAnyCancellableを保持してキャンセル処理を伝えます。
以下のように使うと、
let s = PassthroughSubject<Int, Error>()
let c = s.materialize().print().sink { print($0) }
s.send(1)
s.send(2)
s.send(completion: .failure(NSError(domain: "domain", code: 1)))
//s.send(completion: .finished)
s.send(3)
receive subscription: ((extension in __lldb_expr_6):Combine.Publishers.Materialized<Combine.PassthroughSubject<Swift.Int, Swift.Error>>.MaterializedSubscription)
request unlimited
receive value: (value(1))
value(1)
receive value: (value(2))
value(2)
receive value: (failure(Error Domain=domain Code=1 "(null)"))
failure(Error Domain=domain Code=1 "(null)")
receive finished
のような出力が得られます。
多分こんな感じでmaterializeが実現するとは思うんですが、問題があればコメントで教えてください。