2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

CombineのPublisherにmaterializeを実装してみる

Posted at

CombineのPublisherにignoreNilを実装するを投稿したら

とのコメントがありました。distinctUntilChangedはCombineではremoveDuplicatesですがmaterializeは存在しません(よね?)。

ということでmaterializeっぽいものを作ってみます。

extension Publishers {
    public enum Event<T, E: Error> {
        case value(T)
        case failure(Error)
        case finished
    }

    public class Materialized<Upstream: Publisher>: Publisher {
        public typealias Output = Event<Upstream.Output, Upstream.Failure>
        public typealias Failure = Never
        
        private let upstream: Upstream
        
        init(upstream: Upstream) {
            self.upstream = upstream
        }
        
        public func receive<S>(subscriber: S) where S : Subscriber, Materialized.Failure == S.Failure, Materialized.Output == S.Input {
            let cancellable = upstream.sink(receiveCompletion: { completion in
                switch completion {
                case .failure(let error):
                    subscriber.receive(.failure(error))
                case .finished:
                    subscriber.receive(.finished)
                }
                subscriber.receive(completion: .finished)
            }) { output in
                _ = subscriber.receive(.value(output))
            }
            let subscription = MaterializedSubscription(cancellable: cancellable)
            subscriber.receive(subscription: subscription)
        }
        
        private class MaterializedSubscription: Subscription {
            private let cancellable: AnyCancellable
            
            init(cancellable: AnyCancellable) {
                self.cancellable = cancellable
            }

            func request(_ demand: Subscribers.Demand) {}
            
            func cancel() {
                cancellable.cancel()
            }
        }
    }
}

extension Publisher {
    func materialize() -> Publishers.Materialized<Self> {
        Publishers.Materialized(upstream: self)
    }
}

こんな感じでしょうか。MaterializedはinitでUpstreamを引数にとります。
receiveメソッドではupstreamをsinkして監視し、subscriberにイベントを送出します。
MaterializedSubscriptionが重要で、upstearmのsinkから返るAnyCancellableを保持してキャンセル処理を伝えます。
以下のように使うと、

let s = PassthroughSubject<Int, Error>()
let c = s.materialize().print().sink { print($0) }

s.send(1)
s.send(2)
s.send(completion: .failure(NSError(domain: "domain", code: 1)))
//s.send(completion: .finished)
s.send(3)
receive subscription: ((extension in __lldb_expr_6):Combine.Publishers.Materialized<Combine.PassthroughSubject<Swift.Int, Swift.Error>>.MaterializedSubscription)
request unlimited
receive value: (value(1))
value(1)
receive value: (value(2))
value(2)
receive value: (failure(Error Domain=domain Code=1 "(null)"))
failure(Error Domain=domain Code=1 "(null)")
receive finished

のような出力が得られます。

多分こんな感じでmaterializeが実現するとは思うんですが、問題があればコメントで教えてください。

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?