LoginSignup
0
2

More than 1 year has passed since last update.

[Combine] PublisherのArrayをCombineLatestする。

Posted at

Combineで2つや3つのPublisherをCombineLatestするPublishers.CombineLatest2Publishers.CombineLatest3はありますが、自由長の配列をCombineLatestすることはできません。

ここにCombineLatestCollectionの実装がありますが、この実装はCombine的に間違っているため、他のOperatorと組み合わせるとクラッシュしたりStreamが崩壊したりします。

なので、Combine的に(多分)正しいCombineLatestCollectionの実装を置いておきます。

(速度に敏感・メインスレッドのみな要件で作ったので@inlinableだったり、Lockしてなかったりしますが、そこは適当に調整してください。)

extension Collection where Element: Publisher {
    public var combineLatest: Publishers.CombineLatestCollection<Self> { Publishers.CombineLatestCollection(upstreams: self) }
}

extension Publishers {
    public struct CombineLatestCollection<Upstreams: Collection>: Publisher where Upstreams.Element: Combine.Publisher {
        public typealias Output = [Upstreams.Element.Output]
        public typealias Failure = Upstreams.Element.Failure

        public let upstreams: Upstreams

        public init(upstreams: Upstreams) { self.upstreams = upstreams }

        public func receive<Downstream: Subscriber>(subscriber downstream: Downstream)
            where Downstream.Input == Output, Downstream.Failure == Self.Failure
        {
            let inner = Inner<Downstream>(downstream: downstream, upstreamCount: upstreams.count)
            self.upstreams.enumerated().forEach{ index, upstream in upstream.map{ (index: index, value: $0) }.subscribe(inner) }
        }
    }
}

extension Publishers.CombineLatestCollection {
    public final class Inner<Downstream: Combine.Subscriber>: Combine.Subscriber where Downstream.Input == [Upstreams.Element.Output] {
        public typealias Input = (index: Int, value: Upstreams.Element.Output)
        public typealias Failure = Downstream.Failure

        @usableFromInline let downstream: Downstream
        @usableFromInline let upstreamCount: Int
        @usableFromInline let subscription = Subscription()
        @usableFromInline var prebuildStorage: [Upstreams.Element.Output?]
        @usableFromInline var completeStorage: [Upstreams.Element.Output]? = nil
        @usableFromInline var isValueCompleted = false
        @usableFromInline var isCompleted = false

        @inlinable init(downstream: Downstream, upstreamCount: Int) {
            self.downstream = downstream
            self.upstreamCount = upstreamCount
            self.prebuildStorage = [Upstreams.Element.Output?](repeating: nil, count: upstreamCount)
        }

        @inlinable public func receive(subscription: Combine.Subscription) {
            self.subscription.subscriptions.append(subscription)
            guard self.subscription.subscriptions.count == upstreamCount else { return }
            self.downstream.receive(subscription: self.subscription)
        }

        @inlinable public func receive(_ input: (index: Int, value: Upstreams.Element.Output)) -> Subscribers.Demand {

            if isValueCompleted {
                self.completeStorage![input.index] = input.value
                return self.downstream.receive(self.completeStorage!)
            } else {
                self.prebuildStorage[input.index] = input.value
            }

            if self.prebuildStorage.allSatisfy({ $0 != nil }) {
                self.isValueCompleted = true
                self.completeStorage = self.prebuildStorage as? [Upstreams.Element.Output]
                return self.downstream.receive(self.completeStorage!)
            }

            return .none
        }

        @inlinable public func receive(completion: Subscribers.Completion<Downstream.Failure>) {
            guard !isCompleted else { return }
            self.isCompleted = true
            self.downstream.receive(completion: completion)
        }
    }

    public final class Subscription: Combine.Subscription {
        @usableFromInline var subscriptions = [Combine.Subscription]()

        public func request(_ demand: Subscribers.Demand) {
            for subscription in subscriptions {
                subscription.request(demand)
            }
        }

        public func cancel() {
            for subscription in subscriptions {
                subscription.cancel()
            }
        }
    }
}
0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2