64
40

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

【Swift】Combineで一つのPublisherの出力結果を共有するメソッドやクラスの違い(share, multicast, Future)

Last updated at Posted at 2019-10-24

Combine.frameworkを使用していると
複数のSubscriberで同じ処理結果を共有したい場合があります。

例えば

  • ネットワーク通信を介したデータの取得
  • 画像のダウンロード

といった重い処理を繰り返し実行すると
メモリをたくさん使用したり
時間がかかるためユーザ体験を損なってしまう可能性もあります。

Combine.frameworkでは
そのような時に利用できるメソッドやクラスが用意されています。

今回はそういったメソッドやクラスの違いについて
見てみたいと思います。

下記のサンプルコードは全てPlayground上で実行しています。

share

shareメソッドでは
Publishers.Shareという
**class**のインスタンスが返ってきます。

Publishers.Share
https://developer.apple.com/documentation/combine/publisher/3204754-share

このclass
前(上流)のPublisher

後(下流)のSubscription
を保持することで
同じPublisherからの値を下流のSubscriberへ値を流すことができます。

SubscriptionSubscriberがsubscribeした時に返ってくるものです。
https://developer.apple.com/documentation/combine/subscription

最初のPublishers.Shareへのsubscribeで
Publishers.Shareが内部のPublisherへsubscribeし
Publisherは処理を開始して値を流し始めます。

そしてそれ以降のsubscribeからは
**同じPublisher**からの値の出力を受け取ることができます。

それでは具体的な例として
ネットワークリクエストの結果を
複数のSubscriberで共有する例を見ていきたいと思います。


var cancellables: Set<AnyCancellable> = []
let shared = URLSession.shared
    .dataTaskPublisher(for: URL(string: "https://www.google.com")!)
    .map(\.data)
    .print("shared")
    .share() // **ここでshareを呼んでいる**

print("subscribe 1回目")
shared
    .sink( receiveCompletion: { _ in },
           receiveValue: { print("subscription1 receiveValue: '\($0)'") })
    .store(in: &cancellables)
print("subscribe 2回目")
shared
    .sink( receiveCompletion: { _ in },
           receiveValue: { print("subscription2 receiveValue: '\($0)'") })
    .store(in: &cancellables)

// 出力結果
subscribe 1回目
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
subscribe 2回目
shared: receive value: (13761 bytes)
subscription1 receiveValue: '13761 bytes'
subscription2 receiveValue: '13761 bytes'
shared: receive finished

出力結果を見てみると
1回目のsubscribeでは

subscribe 1回目
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited

と(上流の)Publisherへsubscribeしていますが

2回目の場合

subscribe 2回目
shared: receive value: (13761 bytes)
subscription1 receiveValue: '13761 bytes'
subscription2 receiveValue: '13761 bytes'
shared: receive finished


1回目のsubscribeで出力されていた

shared: receive subscription: (DataTaskPublisher)
shared: request unlimited

がなく
subscribeされていない
ことがわかりました。

それでも2回目のsubscribeも値を受け取っています。

このような結果から
Publisherは一つしか存在していないことがわかります。

では
shareがなかった場合を見てみます。


var cancellables: Set<AnyCancellable> = []
let shared = URLSession.shared
    .dataTaskPublisher(for: URL(string: "https://www.google.com")!)
    .map(\.data)
    .print("shared")
    //.share() // **コメントアウト**

print("subscribe 1回目")
shared
    .sink( receiveCompletion: { _ in },
           receiveValue: { print("subscription1 receiveValue: '\($0)'") })
    .store(in: &cancellables)
print("subscribe 2回目")
shared
    .sink( receiveCompletion: { _ in },
           receiveValue: { print("subscription2 receiveValue: '\($0)'") })
    .store(in: &cancellables)

// 出力結果
subscribe 1回目
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
subscribe 2回目
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
shared: receive value: (13761 bytes)
subscription2 receiveValue: '13761 bytes'
shared: receive finished
shared: receive value: (13763 bytes)
subscription1 receiveValue: '13763 bytes'
shared: receive finished

となり
2回目のsubscribe時でも

subscribe 2回目
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited

と出力されています。

つまり毎回subscribeしていることがわかります。

このようにshareを使うことで
Shareが1度subscribeをするだけで済み
不要なsubscribeがなくなりました。

ただし
shareには注意点もあります。

shareでは
subscribeする前に出力された値を再度出力しません。

つまり
参照を保持するタイミングによっては
期待したい値が得られないかもしれません。


参照を保持する時点で
Publisherがcompletionしていた場合は
finished(completionイベント)のみが返ってきます。
※ sink(receiveCompletion:)の方に値が流れてきます。

下記の例を見てみます。


var cancellables: Set<AnyCancellable> = []
let shared = URLSession.shared
    .dataTaskPublisher(for: URL(string: "https://www.google.com")!)
    .map(\.data)
    .print("shared")
    .share()

print("subscribe 1回目")
shared
    .sink( receiveCompletion: { _ in },
           receiveValue: { print("subscription1 receiveValue: '\($0)'") })
    .store(in: &cancellables)

DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
    print("subscribe 2回目")
    shared
        .sink(receiveCompletion: { print("subscription2 receiveCompletion \($0)")},
              receiveValue: { print("subscription2 receiveValue: '\($0)'") })
        .store(in: &cancellables)
}

// 出力結果
subscribe 1回目
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
shared: receive value: (13750 bytes)
subscription1 receiveValue: '13750 bytes'
shared: receive finished
subscribe 2回目
subscription2 receiveCompletion finished

出力結果からもわかるように
2回目のsubscribeのタイミングを遅らせると
Publisherはすでにcompletionしているため
finished(completionイベント)のみを受け取っていることがわかります。

multicast

上記のshareではタイミングによっては
値を受け取れない可能性がありました。

そこで
Combineでは値の出力を能動的にコントロールできる
multicastというメソッド(戻り値はPublishers.Multicastの**class**)があります。

https://developer.apple.com/documentation/combine/publisher/3204734-multicast
https://developer.apple.com/documentation/combine/publisher/3204733-multicast
https://developer.apple.com/documentation/combine/publishers/multicast

これはConnectablePublisherプロトコルに適合しています。


/// A publisher that uses a subject to deliver elements to multiple subscribers.
final public class Multicast<Upstream, SubjectType>
    : ConnectablePublisher
    where Upstream : Publisher, SubjectType : Subject,
Upstream.Failure == SubjectType.Failure, Upstream.Output == SubjectType.Output {

このプロトコルはconnectというメソッドを有しており
connectを呼び出して初めてPublisherSubscriberを受け取りを処理を開始します。
https://developer.apple.com/documentation/combine/connectablepublisher/3204394-connect

shareの例を
multicastに変えて違いを見てみます。


var cancellables: Set<AnyCancellable> = []
let multicasted = URLSession.shared
    .dataTaskPublisher(for: URL(string: "https://www.google.com")!)
    .map(\.data)
    .print("shared")
    .multicast { PassthroughSubject<Data, URLError>() }

print("subscribe 1回目")
multicasted
    .sink( receiveCompletion: { _ in },
           receiveValue: { print("subscription1 receiveValue: '\($0)'") })
    .store(in: &cancellables)
print("subscribe 2回目")
multicasted
    .sink( receiveCompletion: { _ in },
           receiveValue: { print("subscription2 receiveValue: '\($0)'") })
    .store(in: &cancellables)

multicasted
    .connect()
    .store(in: &cancellables)

// 出力結果
subscribe 1回目
subscribe 2回目
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
shared: receive value: (13741 bytes)
subscription1 receiveValue: '13741 bytes'
subscription2 receiveValue: '13741 bytes'
shared: receive finished

上記の結果では
shareと同じ結果を取得できました。

では
ここでconnectをコメントアウトしてみると


...

//multicasted
//    .connect()
//    .store(in: &cancellables)

// 出力結果
subscribe 1回目
subscribe 2回目

となり
subscribeしてはいるものの
Publisherはまだ処理を実行していません。

このことからmulticastでは
connectを呼ばないと
処理を実行しないことがわかります。

では
2回目のsubscribeを少し遅らせ
connectを呼び出してみた場合を今度は見てみます。


var cancellables: Set<AnyCancellable> = []
let multicasted = URLSession.shared
    .dataTaskPublisher(for: URL(string: "https://www.google.com")!)
    .map(\.data)
    .print("shared")
    .multicast { PassthroughSubject<Data, URLError>() }

print("subscribe 1回目")
multicasted
    .sink( receiveCompletion: { _ in },
           receiveValue: { print("subscription1 receiveValue: '\($0)'") })
    .store(in: &cancellables)

DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
    print("subscribe 2回目")
    multicasted
        .sink( receiveCompletion: { _ in },
               receiveValue: { print("subscription2 receiveValue: '\($0)'") })
        .store(in: &cancellables)
}

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    print("connect")
    multicasted
        .connect()
        .store(in: &cancellables)
}

// 出力結果
subscribe 1回目
subscribe 2回目
connect
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
shared: receive value: (13745 bytes)
subscription1 receiveValue: '13745 bytes'
subscription2 receiveValue: '13745 bytes'
shared: receive finished

connectが呼ばれる前にsubscribeしているため
2回目のsubscribeでもデータがきちんと取得できています。

これでshareで起きていた問題は緩和できそうですが
ちょっと書き方が複雑であったり
connectを呼ぶタイミングによっては
subscribeし損ねてしまう可能性もまだ残っています。

他のライブラリでは出力された値を再度流してくれるメソッドがあります。

例えば
RxSwiftではshareReplayというメソッドと利用することで
指定したサイズ分の流れてきたデータをキャッシュすることができ
subscribe時にその値が流れてくるようにできますが
Combineでは現状存在しません。

もし必要な場合は↓のように独自の実装が必要になります。
https://github.com/tcldr/Entwine/blob/master/Sources/Entwine/Operators/ShareReplay.swift


補足になりますが
ConnectablePublisherプロトコルには
autoconnectというメソッドもありこれを呼び出した場合は
subscribeすると自動で処理を開始して値の出力を始めます。

Future

少し形が違いますが
sharemulticastと同様に
処理の結果を複数のSubscriberで共有できます。

他と違う特徴としては
処理が実行されるタイミングが

初めてSubscribeされたタイミング

ではなく

Futureインスタンスが生成されたタイミング

になります。

下記の例を見ていきます。


var cancellables: Set<AnyCancellable> = []
let future = Future<Data, URLError> { promise in
    URLSession.shared
        .dataTaskPublisher(for: URL(string: "https://www.google.com")!)
        .map(\.data)
        .print("shared")
        .sink(receiveCompletion: {
            if case .failure(let error) = $0 {
                promise(.failure(error))
            }
        }, receiveValue: {
            promise(.success($0))
        }).store(in: &cancellables)
}

// 出力結果
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
shared: receive value: (14676 bytes)
shared: receive finished

上記のようにsubscribeしていない状態でも
クロージャ内の処理が実行されていることがわかります。

ではsubscribeしてみます。


var cancellables: Set<AnyCancellable> = []
let future = Future<Data, URLError> { promise in
    URLSession.shared
        .dataTaskPublisher(for: URL(string: "https://www.google.com")!)
        .map(\.data)
        .print("shared")
        .sink(receiveCompletion: {
            if case .failure(let error) = $0 {
                promise(.failure(error))
            }
        }, receiveValue: {
            promise(.success($0))
        }).store(in: &cancellables)
}

print("subscribe 1回目")
future
    .sink(receiveCompletion: { print("subscription1 receiveCompletion: '\($0)'") },
            receiveValue: { print("subscription1 receiveValue: '\($0)'") })
    .store(in: &cancellables)

print("subscribe 2回目")
future
    .sink(receiveCompletion: { print("subscription2 receiveCompletion: '\($0)'") },
        receiveValue: { print("subscription2 receiveValue: '\($0)'") })
    .store(in: &cancellables)

// 出力結果
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
subscribe 1回目
subscribe 2回目
shared: receive value: (14676 bytes)
subscription1 receiveValue: '14676 bytes'
subscription1 receiveCompletion: 'finished'
subscription2 receiveValue: '14676 bytes'
subscription2 receiveCompletion: 'finished'
shared: receive finished

このようにFuture内の処理は1回しか行われていませんが
2つのSubscriberはどちらも値を受け取ることができています。

では処理が完了した後に
subscribeした場合はどうでしょうか?


var cancellables: Set<AnyCancellable> = []
let future = Future<Data, URLError> { fulfill in
    URLSession.shared
        .dataTaskPublisher(for: URL(string: "https://www.google.com")!)
        .map(\.data)
        .print("shared")
        .sink(receiveCompletion: {
            if case .failure(let error) = $0 {
                fulfill(.failure(error))
            }
        }, receiveValue: {
            fulfill(.success($0))
        }).store(in: &cancellables)
}

print("subscribe 1回目")
future
    .sink(receiveCompletion: { print("subscription1 receiveCompletion: '\($0)'") },
            receiveValue: { print("subscription1 receiveValue: '\($0)'") })
    .store(in: &cancellables)

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    print("subscribe 2回目")
    future
        .sink(receiveCompletion: { print("subscription2 receiveCompletion: '\($0)'") },
            receiveValue: { print("subscription2 receiveValue: '\($0)'") })
        .store(in: &cancellables)
}

// 出力結果
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
subscribe 1回目
shared: receive value: (14676 bytes)
subscription1 receiveValue: '14676 bytes'
subscription1 receiveCompletion: 'finished'
shared: receive finished
subscribe 2回目
subscription2 receiveValue: '14676 bytes'
subscription2 receiveCompletion: 'finished'

上記のように2回目のsubscribeでも
値を受け取れていることがわかりました。

Futureの注意点としては
処理が実行されるタイミングが
Futureインタンスが生成された時点になりますので
subscribeした後に処理が実行されることを期待していると
予期せぬ挙動に遭遇する可能性があります。

まとめ

出力結果を共有できるメソッドやクラスについて見ていきました。

メモリの使用量や通信量を抑えることができるという点で
非常に有用なものですが

いつPublisherは処理を開始して値を出力し始めるのか?
いつsubscribeすると値を受け取ることができるのか?

といったことを知らないと
「何が起きているんだ。。。?」
と悩んでしまうような事象に出くわすかもしれません。

そのような自体にならないためにも
違いを比較して見ていくことは大切だなと感じています。

Combine.frameworkはメソッドやクラスがたくさんあり
全てをは把握することは大変ですが
ある分類に分けて少しずつみていくと
効率的に把握できるのかなと思います。😃

もし間違いなどございましたらご指摘いただけますと助かります🙇🏻‍♂️

64
40
4

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
64
40

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?