Combineで2つや3つのPublisherをCombineLatestするPublishers.CombineLatest2
・Publishers.CombineLatest3
はありますが、自由長の配列をCombineLatestすることはできません。
ここにCombineLatestCollection
の実装がありますが、この実装はCombine的に間違っているため、他のOperatorと組み合わせるとクラッシュしたりStreamが崩壊したりします。
なので、Combine的に(多分)正しいCombineLatestCollection
の実装を置いておきます。
(速度に敏感・メインスレッドのみな要件で作ったので@inlinable
だったり、Lockしてなかったりしますが、そこは適当に調整してください。)
extension Collection where Element: Publisher {
public var combineLatest: Publishers.CombineLatestCollection<Self> { Publishers.CombineLatestCollection(upstreams: self) }
}
extension Publishers {
public struct CombineLatestCollection<Upstreams: Collection>: Publisher where Upstreams.Element: Combine.Publisher {
public typealias Output = [Upstreams.Element.Output]
public typealias Failure = Upstreams.Element.Failure
public let upstreams: Upstreams
public init(upstreams: Upstreams) { self.upstreams = upstreams }
public func receive<Downstream: Subscriber>(subscriber downstream: Downstream)
where Downstream.Input == Output, Downstream.Failure == Self.Failure
{
let inner = Inner<Downstream>(downstream: downstream, upstreamCount: upstreams.count)
self.upstreams.enumerated().forEach{ index, upstream in upstream.map{ (index: index, value: $0) }.subscribe(inner) }
}
}
}
extension Publishers.CombineLatestCollection {
public final class Inner<Downstream: Combine.Subscriber>: Combine.Subscriber where Downstream.Input == [Upstreams.Element.Output] {
public typealias Input = (index: Int, value: Upstreams.Element.Output)
public typealias Failure = Downstream.Failure
@usableFromInline let downstream: Downstream
@usableFromInline let upstreamCount: Int
@usableFromInline let subscription = Subscription()
@usableFromInline var prebuildStorage: [Upstreams.Element.Output?]
@usableFromInline var completeStorage: [Upstreams.Element.Output]? = nil
@usableFromInline var isValueCompleted = false
@usableFromInline var isCompleted = false
@inlinable init(downstream: Downstream, upstreamCount: Int) {
self.downstream = downstream
self.upstreamCount = upstreamCount
self.prebuildStorage = [Upstreams.Element.Output?](repeating: nil, count: upstreamCount)
}
@inlinable public func receive(subscription: Combine.Subscription) {
self.subscription.subscriptions.append(subscription)
guard self.subscription.subscriptions.count == upstreamCount else { return }
self.downstream.receive(subscription: self.subscription)
}
@inlinable public func receive(_ input: (index: Int, value: Upstreams.Element.Output)) -> Subscribers.Demand {
if isValueCompleted {
self.completeStorage![input.index] = input.value
return self.downstream.receive(self.completeStorage!)
} else {
self.prebuildStorage[input.index] = input.value
}
if self.prebuildStorage.allSatisfy({ $0 != nil }) {
self.isValueCompleted = true
self.completeStorage = self.prebuildStorage as? [Upstreams.Element.Output]
return self.downstream.receive(self.completeStorage!)
}
return .none
}
@inlinable public func receive(completion: Subscribers.Completion<Downstream.Failure>) {
guard !isCompleted else { return }
self.isCompleted = true
self.downstream.receive(completion: completion)
}
}
public final class Subscription: Combine.Subscription {
@usableFromInline var subscriptions = [Combine.Subscription]()
public func request(_ demand: Subscribers.Demand) {
for subscription in subscriptions {
subscription.request(demand)
}
}
public func cancel() {
for subscription in subscriptions {
subscription.cancel()
}
}
}
}