はじめに
ReactiveExtensions(以下Rx)の鬼門はHot-Coldの概念とHot変換だと思います。
- Hot変換が必要なのはストリームを枝分かれさせたいときである…
- Hot変換するには
Publish()
してConnect()
すれば良い…
等々はいろいろな記事で紹介されていますが、ではPublish()
すると裏で一体何が起こるのか?Connect()
すると何が起こる?というのは、あまり詳しく解説されている記事が少ない印象です。
そこで、本記事ではRxの鬼門Publish()
メソッドを「とりあえず」で使うのを卒業し、何が行われているかを「完全に理解」できるように解説していきます。
[前提知識]HotとCold
まずは前提となるHotとColdの知識が必要です。
別記事で書きましたので、もしHotColdの知識が曖昧な方はご覧ください。
Subject<T>
クラスの役割とHotColdとの関係性についても解説しています。
↑の記事の内容を要約すると次の3つに言及しています。
1.Subject<T>
クラスの特徴について
Subject<T>
クラスは1つのObservableシーケンスを複数の購読者に対してマルチキャストする機能を持つ。
2.HotなIObservable
とColdなIObservable
の違いについて
HotなIObservable
とは、Subject<T>
クラスから出ているIObservable
のことを指す。
実体はSubject<T>
クラスなので、複数の購読者に対してObservableシーケンスをマルチキャストできるという特徴を持つ。
対して、それ以外の(Subject
以外から出ているような)IObservable
はColdと呼ばれる。
UniRxのオペレータから出てくるIObservable
はほぼColdである。
3.Observableソースについて
Observableソースとは、Observableシーケンスに流れてくる値の発生源のこと。これにもHotとColdがある。
HotなObservableソースは、何回Subscribe
されてもObservableシーケンスは1つ。Subscribe
した時点以降の通知が届くようになる。
Subject
がObservableソースの場合は、HotなObservableソースとなる。
対してColdなObservableソースの場合、Subscribe
されたらその都度専用のObservableシーケンスが生成される。
専用のObservableシーケンスなので、いつSubscribe
してもすべての通知が届く。
主にファクトリメソッド等から生成される場合、ColdなObservableソースとなる。
これらの内容を踏まえた上で、Publish
オペレータについて解説していきます。
Cold→Hot変換するにはどうすればよいのか
前述したHot, Coldの定義と、Subject<T>
クラスの特徴を踏まえて、ColdなIObservable
をHotなIObservable
に変換するにはどうすれば良いのかを考えます。
要は、Subject
クラスから出ているIObservable
ならば確実にHotなのですから、「ColdなObservableをSubject
クラスから再発信すれば良いのではないか」と考えることができます。
具体的には、次の手順を踏みます。
1.Subject<T>
クラスを用意します。
2. Subject<T>
は、Hot変換したいColdなIObservable<T>
を代表して1回だけSubscribe
します。
3. Subscribe
すると、Subject<T>
に対してColdなIObservable<T>
から通知が流れてきます。
4. Subject<T>
クラスのIObservable<T>
を外部に公開します。
Subject<T>
クラスから出ているIObservable<T>
はHotなIObservable<T>
なので、Hot変換が完了しました。
Subject<T>
クラスを使うことで、無事にHot変換を行うことができました。
Publish
すると何が起こる
Hot変換する手順は上記の通り。じゃあPublish
オペレータは上記のことをやっているんだね!と思ったら、実はそうではありません。なんと、Publish
オペレータだけではObservableシーケンスに対してなんの影響も一切与えません。つまり、Publish
オペレータをつけただけでは、Hot変換もまだ完了しないのです。
じゃあPublish
オペレータは何をしているのか?というと、いつでもHot変換ができるような「準備」をしています。
Publish
は「準備」するだけ
Publish
オペレータが実際に行っている処理は、次に示す「準備」のみに留まります。
- Hot変換用の
Subject<T>
クラスを用意する。 - Hot変換したい元となるColdな
IObservable
を用意する。
用意しているだけで、実際にSubject<T>
やIObservable<T>
に対して何らかの操作をしているわけではないことがポイントです。
準備したものに対する「操作」は次の工程に委託する
実際にHot変換が完了するには、あと残り2つの操作を行うことが必要です。
-
Subject<T>
に変換元のColdなIObservable
をSubscribe
させる -
Subject<T>
クラスのIObservable<T>
をSubscribe
する(←Hot Observableに変換完了)
これら2つの操作が完了したとき、はじめてHot変換が完了したことになります。
しかし、これらの操作はPublish
内では行われず、次の工程に委託されます。具体的には、Publish
オペレータの戻り値として、これら2つの操作のみが可能なインターフェイス返すことで、次の工程への委託を実現します。
そのインターフェイスこそが、IConnectableObservable<T>
インターフェイスです。
IConnectableObservable<T>
インターフェイス
IConnectableObservable<T>
インターフェイスとは、その名の通りConnect
ができるIObservable<T>
インターフェイスです。
定義もまんま、Connect
ができるようになったIObservable<T>
です。
public interface IConnectableObservable<T> : IObservable<T>
{
IDisposable Connect();
}
つまり、IConnectableObservable<T>
インターフェイスはConnect
とSubscribe
のみができるインターフェイスということになります。
そして、前項で説明したHot変換に必要な2つの操作が、それぞれConnect
とSubscribe
に割り当てられています。
それぞれのメソッドを呼び出すことで、各操作が完了してHot変換が完了することになります。
メソッド名 | 割り当てられた操作 |
---|---|
Connect |
Publish が用意した内部のSubject<T> に、変換元のColdなIObservable をSubscribe させる |
Subscribe |
IObservable<T> として公開されたSubject<T> をSubscribe する |
図で表すと次のようになります。
次項から更に詳しく解説していきます。
IConnectableObservable<T>
インターフェイス
Publish
オペレータの戻り値はIConnectableObservable<T>
インターフェイスであるということ、そしてIConnectableObservable<T>
インターフェイスはConnect
とSubscribe
の2つの操作が可能であることを説明しました。
まだピンとこないと思うので、まずはいくつかのHot変換の実例に則ってIConnectableObservable<T>
インターフェイスの使い方を説明します。
IConnectableObservable<T>
の使い方
例1:いくつかのオペレータを追加したIObservable
をHot変換して使い回したい
オペレータはColdなので複数回Subscribe
されると無駄な処理が走ることになります。
それを防ぐためにHot変換を行う例です。
//Observableソースを作る
Subject<int> subject = new Subject<int>();
//Subjectからのint型の通知に対して何らかのオペレータを追加
//このまま複数回Subscribeされるとオペレータが重複生成されて無駄な処理が走る
IObservable<TimeSpan> sourceColdObservable =
subject.Do(n => Debug.Log($"{n}が通知されました")) //Hot変換されていることを確認するためのログ
.Where(n => n > 0)
.Select(n => TimeSpan.FromSeconds(n));
//ColdなIObservale「sourceColdObservable」をHot変換する準備をする
IConnectableObservable<TimeSpan> connectableObservable =
sourceColdObservable.Publish();
//Publish内部のSubjectにsourceColdObservableをSubscribeさせる
connectableObservable.Connect();
//Publish内部のSubject(つまりHotなIObservable)をSubscribeする
connectableObservable.Subscribe(time => Debug.Log($"購読者1:{time}"));
connectableObservable.Subscribe(time => Debug.Log($"購読者2:{time}"));
subject.OnNext(30);
subject.OnNext(60);
subject.OnNext(-20);
subject.OnNext(90);
結果
複数回Subscribe
してもDo
オペレータはOnNext
の分だけ通過してることがわかるので、Hot変換が成功していることがわかります。
Hot変換しないと…
ちなみにHot変換しないとこんな感じになります。
Do
オペレータがSubscribe
の数だけ呼ばれていることからも分かる通り、オペレータ処理が重複実行されてしまっています。
例2:ファクトリメソッドから生成されたObservableシーケンスを使い回したい
ファクトリメソッドから生成されたIObservable
は基本的にColdなので、Subscribe
するたびに新しいObservableシーケンスが生成されます。
1つのObservableシーケンスを複数回Subscribe
して使い回したいためにHot変換を行う例です。
//1秒毎にインクリメントする値を発行するColdなObservableソース
IObservable<long> observableTimer = Observable.Interval(TimeSpan.FromSeconds(1));
//Hot変換の準備をする
IConnectableObservable<long> connectableObservable =
observableTimer.Publish();
//Publish内でobservableTimerをSubscribe
//→Observableシーケンスが生成され、タイマーが開始される
connectableObservable.Connect();
//Publish内のSubjectをSubscribe
connectableObservable.Subscribe(l => Debug.Log($"最初からの購読者【{l}】"));
//2.5秒後にもう一度Publish内のSubjectをSubscribe
await Task.Delay(2500);
connectableObservable.Subscribe(l => Debug.Log($"2.5秒遅れの購読者【{l}】"));
結果
Hot変換することで、「最初からの購読者」、「2.5秒遅れの購読者」に対して、同一の値が発行されていることが分かります。したがって、1つのObservableシーケンスを共有できていることがわかります。
Hot変換しないと…
Hot変換しないと、「2.5秒遅れの購読者」に対しては新しく0から値が発行されていることがわかります。つまり、それぞれObservableシーケンスが別個に生成されているということになります。
なぜConnect
が必要なのか
ところで、上記の2例ではどちらも
- Cold Observableを
Publish
- 出てきた
IConnectableObservable<T>
をConnect
-
IConnectableObservable<T>
を好きなだけSubscribe
の順に操作しています。
であれば、Connect
の存在意義とは一体何なのか?Publish
してConnect
するのなら、Publish
内で自動的にやっておいてくれれば良いのではないか?と思うかもしれません。
ところが、Connect
を呼び出すタイミングには重要な意味があって、場合によってはConnect
の呼び出すタイミングによって実行結果がまったく変わってくることがあります。
例えば、以下のような例では、Connect
を呼び出すタイミングが非常に重要になります。
①〜④のうち、どこでConnect()
を呼び出すべきでしょうか?
//1~10までの値を発行するColdなObservableソース
IObservable<int> observableRange = Observable.Range(1, 10);
IConnectableObservable<int> connectableObservable =
observableRange.Publish();
//①ここでConnect()?
connectableObservable.Subscribe(n => Debug.Log($"1個めのSubscribe:{n}");
//②ここでConnect()?
connectableObservable.Subscribe(n => Debug.Log($"2個めのSubscribe:{n}"));
//③ここでConnect()?
connectableObservable.Subscribe(n => Debug.Log($"3個めのSubscribe:{n}"));
//④ここでConnect()?
↓
↓
↓
↓
答えは、3つのSubscribe
にすべて値を通知させるためには、④の位置でConnect()
を呼び出す必要があります。
Connect()
の効果は「Publish
内のSubject<T>
に、変換元のColdなIObservableをSubscribe
させる」でした。
この場合、「変換元のColdなIObservable
」とはObservable.Range
ですので、Subscribe
した瞬間に1~10までの値が即座に流れてきてしまいます。
したがって、例えば①の位置でConnect()
を呼び出すと、誰もSubject
をSubscribe
しないまま1~10の値が流れてきてすぐにOnCompleted
するので、結果としてObservable.Range
が発行した値は虚無に消えます。
OnCompleted
したあとに3回Subscribe
していますが、もはやなんの値も流れてこないので、値がまったく通知されてこないことになってしまいます。
②や③でConnect()
した場合は、Connect()
以前にSubscribe
していた購読先には値が届きますが、Connect()
以降にSubscribe
した購読先には値は届きません。
④でConnect()
すれば、きちんとすべての購読先がSubscribe
を完了させ、準備万端な状態でObservable.Range
をSubscribe
するので、すべての購読先に対してObservable.Range
の発行値をマルチキャストすることができます。
このように、ファクトリメソッドなどのColdなObservableソースをHot変換する場合は、Connect()
を呼ぶタイミングが非常に重要になることが分かります。
Connect
のタイミングに左右されない場合
対して、Connect()
を呼ぶタイミングが重要ではない場合もあります。それは、ObservableソースがHotである場合です。
ObservableソースがHotとは、Subject
から値が生成されている場合を指します。
先ほどの例のObservable.Range
ファクトリメソッドの場合は、Subscribe
したタイミングで、値が生成されて発行されていくのでした。つまり、「購読側で値の発行タイミングを制御できている」と言えます。
これに対してSubject
から値が生成されているということは、値の発行タイミングがOnNext
が呼ばれるタイミングに委ねられることになります。つまり、「購読側では値が発行されるタイミングを制御することができない」、と言えます。
値の発行タイミングが制御できるのであれば、値の発行開始を示すConnect()
呼び出しのタイミングは前述の通り非常に重要になりますが、Subject
のように値の発行タイミングが制御できないのであれば、もはやConnect()
の呼び出すタイミングはいつでも良いです。いつSubscribe
しようが、値が流れてくるタイミングはこちらでは制御できないためです。
例えば、次のような例で考えてみます。
class SubjectProvider
{
private Subject<int> subject = new Subject<int>();
//中略
//Subjectに対して何らかのタイミングでOnNextにより値を発行している
//SubjectをIObservableとして公開している
public IObservable<int> Observable => subject;
}
class SubjectSubscriber
{
private SubjectProvider subjectProvider = new SubjectProvider();
public SubjectSubscriber()
{
//外部が公開しているSubjectに対してオペレータをつけてHot変換
IConnectableObservable<TimeSpan> connectableObservable =
subjectProvider.Observable
.Where(n => n > 0)
.Select(n => TimeSpan.FromSeconds(n))
.Publish();
//どこでConnectしよう?
connectableObservable.Subscribe(time => Debug.Log(time));
//どこでConnectしよう?
connectableObservable.Subscribe(time => Debug.Log(time));
//どこでConnectしよう?
connectableObservable.Subscribe(time => Debug.Log(time));
//どこでConnectしよう?
}
}
SubjectSubscriber
は、外部が公開しているSubject
を取得してHot変換をかけています。どこでConnect
しようか迷っているようですが、この場合はどこでConnect
しても結果は変わりません。
ObservableソースがSubject
の場合、いつSubscribe
しようが単一のObservableシーケンスをSubscribe
することになるので、Subscribe
のタイミングによって結果が左右されることがないのです。
ただ、もちろんSubscribe
のタイミングが大幅に遅れると、それまで発行された値は通知されなくなるので、例えば以下のようにするのは問題です。
IConnectableObservable<TimeSpan> connectableObservable =
subjectProvider.Observable
.Where(n => n > 0)
.Select(n => TimeSpan.FromSeconds(n))
.Publish();
connectableObservable.Subscribe(time => Debug.Log(time));
connectableObservable.Subscribe(time => Debug.Log(time));
connectableObservable.Subscribe(time => Debug.Log(time));
//10秒経ってからソースSubjectをSubscribe
await Task.Delay(10000);
connectableObservable.Connect();
このようにすると、connectableObservable
をSubscribe
してから10秒間に発行された値は購読先にマルチキャストされなくなってしまうため、このような極端な例はNGとなります。
逆に、早くConnect
する分にはまったく問題がありませんので、ObservableソースがHotであると分かっている場合は、IConnectableObservable<T>
を即Connect
してしまっても問題ないでしょう。
Connect
の戻り値のIDisposable
とは何なのか
ところで、Connect()
の戻り値はIDisposable
となっています。果たしてこのIDisposable
は、Subscribe
の戻り値のIDisposable
と何が違うのでしょうか?前に掲載した図をもう一度見てみると、違いはすぐに分かると思います。
この図を見ると、Connect
もSubscribe
もどちらもやっていることは「Subscribe
」ですね。
したがって、Connect
の戻り値のIDisposable
とは、Subject
が変換元のIObservable
をSubscribe
したときに出てきたIDisposable
のことであるということがわかります。
IConnectableObservable
のSubscribe
の戻り値のIDisposable
をDispose
すると、Subject
への購読がDispose
されるのと同様に、Connect
の戻り値のIDisposable
をDispose
すると、ソースとなるColdなIObservable
への購読がDispose
されるということになります。
Connect
またはSubscribe
をDispose
したときの挙動の違い
ではどちらをDispose
すべきなのか。
まず前提として、ただ単にDispose
したいだけ、購読を解除したいだけならば、どちらをDispose
しても挙動は変わりません。
Subscribe
のほうをDispose
すればSubject
自体への購読が直接解除されるし、Connect
のほうをDispose
すれば、Subject
に入ってくるソースからの通知が遮断されるので、結果的に通知が来なくなります。
挙動が変わってくるのは、Connect
をDispose
したあとに、再びConnect
(再接続)するような場合です。そして、これに関しても、ソースとなるColdなIObservable
のObservableソースがColdなのかHotなのかで挙動が随分変わってきます。
もし、ObservableソースがHot、つまりSubject
から値が生成されているならば、何度「切断/再接続」を行ったとしても、ただ単に接続されている場合は値が飛んでくる、切断した場合は値が飛んでこない、それだけの違いになります。
しかし、ObservableソースがCold、つまりファクトリメソッドなどから値が生成されている場合は、「切断/再接続」をするたびに、新しいObservableソースが生成されることになります。
例えば、以下の例を見てください。
Observable.Interval
をPublish
してConnect
したあと、3秒後に切断/再接続を行う例です。
IObservable<long> observableInterval = Observable.Interval(TimeSpan.FromSeconds(1));
IConnectableObservable<long> connectableObservable =
observableInterval.Publish();
connectableObservable.Subscribe(n => Debug.Log($"1個めのSubscribe:{n}"));
connectableObservable.Subscribe(n => Debug.Log($"2個めのSubscribe:{n}"));
//Observale.Intervalに接続する
IDisposable connection = connectableObservable.Connect();
//3秒経ったら切断して再接続する
await Task.Delay(3000);
Debug.Log("3秒経ったので切断/再接続");
connection.Dispose();
connectableObservable.Connect();
実行結果は次のようになります。
3秒経って「切断」を行うと、それまでのObservableシーケンスは破棄されます。そして、「再接続」を行うと、また新しいObservableシーケンスが生成されるため、インクリメントが0に戻っていることがわかります。
次の例のように、Observableソース自体をHot化しておけば、「切断/再接続」を行ってもObservableシーケンスは破棄されず、再接続しても途中から始まります。
IConnectableObservable<long> connectableInterval =
Observable.Interval(TimeSpan.FromSeconds(1))
.Publish();
connectableInterval.Connect(); //Observable.Interval自体をHot化(ObservableソースがHot化する)
IConnectableObservable<long> connectableObservable =
connectableInterval.Publish();
connectableObservable.Subscribe(n => Debug.Log($"1個めのSubscribe:{n}"));
connectableObservable.Subscribe(n => Debug.Log($"2個めのSubscribe:{n}"));
//Observale.IntervalをHot化したものに接続する
IDisposable connection = connectableObservable.Connect();
//3秒経ったら切断して再接続する
await Task.Delay(3000);
Debug.Log("3秒経ったので切断/再接続");
connection.Dispose();
connectableObservable.Connect();
挙動まとめ
Connect
のIDisposable
とSubscribe
のIDisposable
、それぞれの挙動をまとめると次のようになります。
Obsevableソース |
Connect をDispose
|
Subscribe をDispose
|
---|---|---|
Hot | 通知が来なくなる 再接続すると、そのタイミングからまた通知が再開される |
通知が来なくなる |
Cold | 通知が来なくなる 再接続すると、新しいObservableシーケンスが生成されるので最初から値が発行される |
通知が来なくなる |
まとめ
長くなってしまいましたがまとめると次のようになります。
- Hot変換とは、Observableシーケンスを
Subject
が代表して購読し、そこから再配信することである -
Publish
オペレータはHot変換の準備をしているだけ。Publish
をつけただけでは、Observableシーケンスになんの影響も与えない - ObservableソースがHotかColdかによって
Connect
すべきタイミングが異なる - 切断/再接続時の挙動もObservableソースのHotColdによって異なるので注意
自分の中でPublish
の理解が曖昧だったので記事にまとめましたがだいぶしっかりした知識になった気がします。
あと基本、外部にIObservable
を公開するときはHot変換して公開したほうが良さそうな気がします。外部からはこのIObservable
がHotかColdか、はたまたObservableソースがColdなのかHotなのかなんてわかりませんから。外部に公開するときはHot、これを徹底しておけばハマる確率がかなり減らせるのではないかと思いました。もちろん意図的にColdで公開したい!って場合は、わかるようにしておけばそれはそれで良いと思いますけどね。
では、最後までありがとうございました!
ご意見ご指摘等ありましたらコメントお願いします!