LoginSignup
14

More than 1 year has passed since last update.

【UniRx】Cold→Hot変換の代表、「Publishオペレータ」を完全に理解する

Posted at

はじめに

ReactiveExtensions(以下Rx)の鬼門はHot-Coldの概念とHot変換だと思います。

  • Hot変換が必要なのはストリームを枝分かれさせたいときである…
  • Hot変換するにはPublish()してConnect()すれば良い…

等々はいろいろな記事で紹介されていますが、ではPublish()すると裏で一体何が起こるのか?Connect()すると何が起こる?というのは、あまり詳しく解説されている記事が少ない印象です。

そこで、本記事ではRxの鬼門Publish()メソッドを「とりあえず」で使うのを卒業し、何が行われているかを「完全に理解」できるように解説していきます。

[前提知識]HotとCold

まずは前提となるHotとColdの知識が必要です。
別記事で書きましたので、もしHotColdの知識が曖昧な方はご覧ください。
Subject<T>クラスの役割とHotColdとの関係性についても解説しています。

↑の記事の内容を要約すると次の3つに言及しています。

1.Subject<T>クラスの特徴について

Subject<T>クラスは1つのObservableシーケンスを複数の購読者に対してマルチキャストする機能を持つ。

Subjectの役割_OnNext.png

2.HotなIObservableとColdなIObservableの違いについて

HotなIObservableとは、Subject<T>クラスから出ているIObservableのことを指す。

image.png

実体はSubject<T>クラスなので、複数の購読者に対してObservableシーケンスをマルチキャストできるという特徴を持つ。

対して、それ以外の(Subject以外から出ているような)IObservableはColdと呼ばれる。
UniRxのオペレータから出てくるIObservableはほぼColdである。

3.Observableソースについて

Observableソースとは、Observableシーケンスに流れてくる値の発生源のこと。これにもHotとColdがある。

HotなObservableソースは、何回SubscribeされてもObservableシーケンスは1つ。Subscribeした時点以降の通知が届くようになる。
SubjectがObservableソースの場合は、HotなObservableソースとなる。

Hot Observableソース.png

対してColdなObservableソースの場合、Subscribeされたらその都度専用のObservableシーケンスが生成される。
専用のObservableシーケンスなので、いつSubscribeしてもすべての通知が届く。
主にファクトリメソッド等から生成される場合、ColdなObservableソースとなる。

Cold Observableソース.png

これらの内容を踏まえた上で、Publishオペレータについて解説していきます。

Cold→Hot変換するにはどうすればよいのか

前述したHot, Coldの定義と、Subject<T>クラスの特徴を踏まえて、ColdIObservableHotIObservableに変換するにはどうすれば良いのかを考えます。
要は、Subjectクラスから出ているIObservableならば確実にHotなのですから、「ColdなObservableをSubjectクラスから再発信すれば良いのではないか」と考えることができます。

具体的には、次の手順を踏みます。

1.Subject<T>クラスを用意します。

Subjectクラス.png

2. Subject<T>は、Hot変換したいColdなIObservable<T>を代表して1回だけSubscribeします。

2.ColdをSubscribe.png

3. Subscribeすると、Subject<T>に対してColdなIObservable<T>から通知が流れてきます。

3.通知が流れてくる.png

4. Subject<T>クラスのIObservable<T>を外部に公開します。

Subject<T>クラスから出ているIObservable<T>はHotなIObservable<T>なので、Hot変換が完了しました。

4.Hot変換完了.png

Subject<T>クラスを使うことで、無事にHot変換を行うことができました。

Publishすると何が起こる

Hot変換する手順は上記の通り。じゃあPublishオペレータは上記のことをやっているんだね!と思ったら、実はそうではありません。なんと、PublishオペレータだけではObservableシーケンスに対してなんの影響も一切与えません。つまり、Publishオペレータをつけただけでは、Hot変換もまだ完了しないのです。

じゃあPublishオペレータは何をしているのか?というと、いつでもHot変換ができるような「準備」をしています。

Publishは「準備」するだけ

Publishオペレータが実際に行っている処理は、次に示す「準備」のみに留まります。

  • Hot変換用のSubject<T>クラスを用意する。
  • Hot変換したい元となるColdなIObservableを用意する。

用意しているだけで、実際にSubject<T>IObservable<T>に対して何らかの操作をしているわけではないことがポイントです。

準備したものに対する「操作」は次の工程に委託する

実際にHot変換が完了するには、あと残り2つの操作を行うことが必要です。

  1. Subject<T>に変換元のColdなIObservableSubscribeさせる
  2. Subject<T>クラスのIObservable<T>Subscribeする(←Hot Observableに変換完了

これら2つの操作が完了したとき、はじめてHot変換が完了したことになります。
しかし、これらの操作はPublish内では行われず、次の工程に委託されます。具体的には、Publishオペレータの戻り値として、これら2つの操作のみが可能なインターフェイス返すことで、次の工程への委託を実現します。
そのインターフェイスこそが、IConnectableObservable<T>インターフェイスです。

IConnectableObservable<T>インターフェイス

IConnectableObservable<T>インターフェイスとは、その名の通りConnectができるIObservable<T>インターフェイスです。
定義もまんま、ConnectができるようになったIObservable<T>です。

public interface IConnectableObservable<T> : IObservable<T>
{
    IDisposable Connect();
}

つまり、IConnectableObservable<T>インターフェイスはConnectSubscribeのみができるインターフェイスということになります。

そして、前項で説明したHot変換に必要な2つの操作が、それぞれConnectSubscribeに割り当てられています。
それぞれのメソッドを呼び出すことで、各操作が完了してHot変換が完了することになります。

メソッド名 割り当てられた操作
Connect Publishが用意した内部のSubject<T>に、変換元のColdなIObservableSubscribeさせる
Subscribe IObservable<T>として公開されたSubject<T>Subscribeする

図で表すと次のようになります。

ConnectableObservable.png

次項から更に詳しく解説していきます。

IConnectableObservable<T>インターフェイス

Publishオペレータの戻り値はIConnectableObservable<T>インターフェイスであるということ、そしてIConnectableObservable<T>インターフェイスはConnectSubscribeの2つの操作が可能であることを説明しました。
まだピンとこないと思うので、まずはいくつかのHot変換の実例に則ってIConnectableObservable<T>インターフェイスの使い方を説明します。

IConnectableObservable<T>の使い方

例1:いくつかのオペレータを追加したIObservableをHot変換して使い回したい

オペレータはColdなので複数回Subscribeされると無駄な処理が走ることになります。
それを防ぐためにHot変換を行う例です。

//Observableソースを作る
Subject<int> subject = new Subject<int>();

//Subjectからのint型の通知に対して何らかのオペレータを追加
//このまま複数回Subscribeされるとオペレータが重複生成されて無駄な処理が走る
IObservable<TimeSpan> sourceColdObservable =
    subject.Do(n => Debug.Log($"{n}が通知されました")) //Hot変換されていることを確認するためのログ
           .Where(n => n > 0)
           .Select(n => TimeSpan.FromSeconds(n));

//ColdなIObservale「sourceColdObservable」をHot変換する準備をする
IConnectableObservable<TimeSpan> connectableObservable =
    sourceColdObservable.Publish();

//Publish内部のSubjectにsourceColdObservableをSubscribeさせる
connectableObservable.Connect();

//Publish内部のSubject(つまりHotなIObservable)をSubscribeする
connectableObservable.Subscribe(time => Debug.Log($"購読者1:{time}"));
connectableObservable.Subscribe(time => Debug.Log($"購読者2:{time}"));

subject.OnNext(30);
subject.OnNext(60);
subject.OnNext(-20);
subject.OnNext(90);

結果

複数回SubscribeしてもDoオペレータはOnNextの分だけ通過してることがわかるので、Hot変換が成功していることがわかります。

image.png

Hot変換しないと…

ちなみにHot変換しないとこんな感じになります。
DoオペレータがSubscribeの数だけ呼ばれていることからも分かる通り、オペレータ処理が重複実行されてしまっています。

image.png

例2:ファクトリメソッドから生成されたObservableシーケンスを使い回したい

ファクトリメソッドから生成されたIObservableは基本的にColdなので、Subscribeするたびに新しいObservableシーケンスが生成されます。
1つのObservableシーケンスを複数回Subscribeして使い回したいためにHot変換を行う例です。

//1秒毎にインクリメントする値を発行するColdなObservableソース
IObservable<long> observableTimer = Observable.Interval(TimeSpan.FromSeconds(1));

//Hot変換の準備をする
IConnectableObservable<long> connectableObservable =
    observableTimer.Publish();

//Publish内でobservableTimerをSubscribe
//→Observableシーケンスが生成され、タイマーが開始される
connectableObservable.Connect();

//Publish内のSubjectをSubscribe
connectableObservable.Subscribe(l => Debug.Log($"最初からの購読者【{l}】"));
//2.5秒後にもう一度Publish内のSubjectをSubscribe
await Task.Delay(2500);
connectableObservable.Subscribe(l => Debug.Log($"2.5秒遅れの購読者【{l}】"));

結果

Hot変換することで、「最初からの購読者」、「2.5秒遅れの購読者」に対して、同一の値が発行されていることが分かります。したがって、1つのObservableシーケンスを共有できていることがわかります。

image.png

Hot変換しないと…

Hot変換しないと、「2.5秒遅れの購読者」に対しては新しく0から値が発行されていることがわかります。つまり、それぞれObservableシーケンスが別個に生成されているということになります。

image.png

なぜConnectが必要なのか

ところで、上記の2例ではどちらも

  1. Cold ObservableをPublish
  2. 出てきたIConnectableObservable<T>Connect
  3. IConnectableObservable<T>を好きなだけSubscribe

の順に操作しています。
であれば、Connectの存在意義とは一体何なのか?PublishしてConnectするのなら、Publish内で自動的にやっておいてくれれば良いのではないか?と思うかもしれません。
ところが、Connectを呼び出すタイミングには重要な意味があって、場合によってはConnectの呼び出すタイミングによって実行結果がまったく変わってくることがあります。

例えば、以下のような例では、Connectを呼び出すタイミングが非常に重要になります。
①〜④のうち、どこでConnect()を呼び出すべきでしょうか?

//1~10までの値を発行するColdなObservableソース
IObservable<int> observableRange = Observable.Range(1, 10);

IConnectableObservable<int> connectableObservable =
    observableRange.Publish();

//①ここでConnect()?
connectableObservable.Subscribe(n => Debug.Log($"1個めのSubscribe:{n}");
//②ここでConnect()?
connectableObservable.Subscribe(n => Debug.Log($"2個めのSubscribe:{n}"));
//③ここでConnect()?
connectableObservable.Subscribe(n => Debug.Log($"3個めのSubscribe:{n}"));
//④ここでConnect()?




答えは、3つのSubscribeにすべて値を通知させるためには、④の位置Connect()を呼び出す必要があります。

Connect()の効果は「Publish内のSubject<T>に、変換元のColdなIObservableをSubscribeさせる」でした。
この場合、「変換元のColdなIObservable」とはObservable.Rangeですので、Subscribeした瞬間に1~10までの値が即座に流れてきてしまいます

したがって、例えば①の位置でConnect()を呼び出すと、誰もSubjectSubscribeしないまま1~10の値が流れてきてすぐにOnCompletedするので、結果としてObservable.Rangeが発行した値は虚無に消えます
OnCompletedしたあとに3回Subscribeしていますが、もはやなんの値も流れてこないので、値がまったく通知されてこないことになってしまいます。

②や③でConnect()した場合は、Connect()以前にSubscribeしていた購読先には値が届きますが、Connect()以降にSubscribeした購読先には値は届きません。

④でConnect()すれば、きちんとすべての購読先がSubscribeを完了させ、準備万端な状態でObservable.RangeSubscribeするので、すべての購読先に対してObservable.Rangeの発行値をマルチキャストすることができます。

このように、ファクトリメソッドなどのColdなObservableソースをHot変換する場合は、Connect()を呼ぶタイミングが非常に重要になることが分かります。

Connectのタイミングに左右されない場合

対して、Connect()を呼ぶタイミングが重要ではない場合もあります。それは、ObservableソースがHotである場合です。
ObservableソースがHotとは、Subjectから値が生成されている場合を指します。

先ほどの例のObservable.Rangeファクトリメソッドの場合は、Subscribeしたタイミングで、値が生成されて発行されていくのでした。つまり、「購読側で値の発行タイミングを制御できている」と言えます。
これに対してSubjectから値が生成されているということは、値の発行タイミングがOnNextが呼ばれるタイミングに委ねられることになります。つまり、「購読側では値が発行されるタイミングを制御することができない」、と言えます。

値の発行タイミングが制御できるのであれば、値の発行開始を示すConnect()呼び出しのタイミングは前述の通り非常に重要になりますが、Subjectのように値の発行タイミングが制御できないのであれば、もはやConnect()の呼び出すタイミングはいつでも良いです。いつSubscribeしようが、値が流れてくるタイミングはこちらでは制御できないためです。

例えば、次のような例で考えてみます。

class SubjectProvider
{
    private Subject<int> subject = new Subject<int>();

    //中略
    //Subjectに対して何らかのタイミングでOnNextにより値を発行している

    //SubjectをIObservableとして公開している
    public IObservable<int> Observable => subject;
}

class SubjectSubscriber
{
    private SubjectProvider subjectProvider = new SubjectProvider();

    public SubjectSubscriber()
    {
        //外部が公開しているSubjectに対してオペレータをつけてHot変換
        IConnectableObservable<TimeSpan> connectableObservable =
            subjectProvider.Observable
                           .Where(n => n > 0)
                           .Select(n => TimeSpan.FromSeconds(n))
                           .Publish();
        //どこでConnectしよう?
        connectableObservable.Subscribe(time => Debug.Log(time));
        //どこでConnectしよう?
        connectableObservable.Subscribe(time => Debug.Log(time));
        //どこでConnectしよう?
        connectableObservable.Subscribe(time => Debug.Log(time));
        //どこでConnectしよう?
    }
}

SubjectSubscriberは、外部が公開しているSubjectを取得してHot変換をかけています。どこでConnectしようか迷っているようですが、この場合はどこでConnectしても結果は変わりません。
ObservableソースがSubjectの場合、いつSubscribeしようが単一のObservableシーケンスをSubscribeすることになるので、Subscribeのタイミングによって結果が左右されることがないのです。

ただ、もちろんSubscribeのタイミングが大幅に遅れると、それまで発行された値は通知されなくなるので、例えば以下のようにするのは問題です。

IConnectableObservable<TimeSpan> connectableObservable =
    subjectProvider.Observable
                   .Where(n => n > 0)
                   .Select(n => TimeSpan.FromSeconds(n))
                   .Publish();

connectableObservable.Subscribe(time => Debug.Log(time));
connectableObservable.Subscribe(time => Debug.Log(time));
connectableObservable.Subscribe(time => Debug.Log(time));

//10秒経ってからソースSubjectをSubscribe
await Task.Delay(10000);
connectableObservable.Connect();

このようにすると、connectableObservableSubscribeしてから10秒間に発行された値は購読先にマルチキャストされなくなってしまうため、このような極端な例はNGとなります。

逆に、早くConnectする分にはまったく問題がありませんので、ObservableソースがHotであると分かっている場合は、IConnectableObservable<T>を即Connectしてしまっても問題ないでしょう。

Connectの戻り値のIDisposableとは何なのか

ところで、Connect()の戻り値はIDisposableとなっています。果たしてこのIDisposableは、Subscribeの戻り値のIDisposableと何が違うのでしょうか?前に掲載した図をもう一度見てみると、違いはすぐに分かると思います。

ConnectableObservable.png

この図を見ると、ConnectSubscribeもどちらもやっていることは「Subscribeですね。

したがって、Connectの戻り値のIDisposableとは、Subjectが変換元のIObservableSubscribeしたときに出てきたIDisposableのことであるということがわかります。
IConnectableObservableSubscribeの戻り値のIDisposableDisposeすると、Subjectへの購読がDisposeされるのと同様に、Connectの戻り値のIDisposableDisposeすると、ソースとなるColdなIObservableへの購読がDisposeされるということになります。

ConnectまたはSubscribeDisposeしたときの挙動の違い

ではどちらをDisposeすべきなのか。

まず前提として、ただ単にDisposeしたいだけ、購読を解除したいだけならば、どちらをDisposeしても挙動は変わりません。
SubscribeのほうをDisposeすればSubject自体への購読が直接解除されるし、ConnectのほうをDisposeすれば、Subjectに入ってくるソースからの通知が遮断されるので、結果的に通知が来なくなります。

挙動が変わってくるのは、ConnectDisposeしたあとに、再びConnect(再接続)するような場合です。そして、これに関しても、ソースとなるColdなIObservableのObservableソースがColdなのかHotなのかで挙動が随分変わってきます

もし、ObservableソースがHot、つまりSubjectから値が生成されているならば、何度「切断/再接続」を行ったとしても、ただ単に接続されている場合は値が飛んでくる、切断した場合は値が飛んでこない、それだけの違いになります。
しかし、ObservableソースがCold、つまりファクトリメソッドなどから値が生成されている場合は、「切断/再接続」をするたびに、新しいObservableソースが生成されることになります。

例えば、以下の例を見てください。
Observable.IntervalPublishしてConnectしたあと、3秒後に切断/再接続を行う例です。

IObservable<long> observableInterval = Observable.Interval(TimeSpan.FromSeconds(1));

IConnectableObservable<long> connectableObservable =
    observableInterval.Publish();

connectableObservable.Subscribe(n => Debug.Log($"1個めのSubscribe:{n}"));
connectableObservable.Subscribe(n => Debug.Log($"2個めのSubscribe:{n}"));

//Observale.Intervalに接続する
IDisposable connection = connectableObservable.Connect();

//3秒経ったら切断して再接続する
await Task.Delay(3000);
Debug.Log("3秒経ったので切断/再接続");
connection.Dispose();
connectableObservable.Connect();

実行結果は次のようになります。

image.png

3秒経って「切断」を行うと、それまでのObservableシーケンスは破棄されます。そして、「再接続」を行うと、また新しいObservableシーケンスが生成されるため、インクリメントが0に戻っていることがわかります。

次の例のように、Observableソース自体をHot化しておけば、「切断/再接続」を行ってもObservableシーケンスは破棄されず、再接続しても途中から始まります。


IConnectableObservable<long> connectableInterval =
    Observable.Interval(TimeSpan.FromSeconds(1))
              .Publish();
connectableInterval.Connect(); //Observable.Interval自体をHot化(ObservableソースがHot化する)

IConnectableObservable<long> connectableObservable =
    connectableInterval.Publish();

connectableObservable.Subscribe(n => Debug.Log($"1個めのSubscribe:{n}"));
connectableObservable.Subscribe(n => Debug.Log($"2個めのSubscribe:{n}"));

//Observale.IntervalをHot化したものに接続する
IDisposable connection = connectableObservable.Connect();

//3秒経ったら切断して再接続する
await Task.Delay(3000);
Debug.Log("3秒経ったので切断/再接続");
connection.Dispose();
connectableObservable.Connect();

image.png

挙動まとめ

ConnectIDisposableSubscribeIDisposable、それぞれの挙動をまとめると次のようになります。

Obsevableソース ConnectDispose SubscribeDispose
Hot 通知が来なくなる
再接続すると、そのタイミングからまた通知が再開される
通知が来なくなる
Cold 通知が来なくなる
再接続すると、新しいObservableシーケンスが生成されるので最初から値が発行される
通知が来なくなる

まとめ

長くなってしまいましたがまとめると次のようになります。

  • Hot変換とは、ObservableシーケンスをSubjectが代表して購読し、そこから再配信することである
  • PublishオペレータはHot変換の準備をしているだけ。Publishをつけただけでは、Observableシーケンスになんの影響も与えない
  • ObservableソースがHotかColdかによってConnectすべきタイミングが異なる
  • 切断/再接続時の挙動もObservableソースのHotColdによって異なるので注意

自分の中でPublishの理解が曖昧だったので記事にまとめましたがだいぶしっかりした知識になった気がします。
あと基本、外部にIObservableを公開するときはHot変換して公開したほうが良さそうな気がします。外部からはこのIObservableがHotかColdか、はたまたObservableソースがColdなのかHotなのかなんてわかりませんから。外部に公開するときはHot、これを徹底しておけばハマる確率がかなり減らせるのではないかと思いました。もちろん意図的にColdで公開したい!って場合は、わかるようにしておけばそれはそれで良いと思いますけどね。

では、最後までありがとうございました!
ご意見ご指摘等ありましたらコメントお願いします!

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14