C#
ReactiveExtensions

[Rx入門] Subject詳解 / Hotな、ColdなObservableのこと

More than 3 years have passed since last update.

この記事は、既にRxをゴリゴリ動かしてリアクティブな世界を堪能してる人あたりを対象にしています。


Subject!!!!

RxにはSubjectというものがあります。今回はこのSubjectを中心にRxのことを適当に話します。


Subjectってなに

Subjectは既に色んな場所で解説されているとおり、IObservable<T>インターフェースとIObserver<T>インターフェースを両方実装したクラスです。といっても全然分からないので、さっさと動かしてみましょう。

public static void Main()

{
// subjectをつくる
var subject = new Subject<int>();

// subjectを購読する
var disposable = subject.Subscribe(
x => WriteLine($"OnNext: { x }"),
ex => WriteLine($"OnError: { ex.Message }"),
() => WriteLine("OnCompleted"));

// subjectのメソッドをよぶ
subject.OnNext(1);
subject.OnNext(4);
subject.OnNext(7);
subject.OnCompleted();
}

/*/ Output:

OnNext: 1
OnNext: 4
OnNext: 7
OnCompleted

/*/

Subject自体がIObservableのsourceとなっていますね。そしてOnNext()OnCompleted()のメソッドを呼び出すことでSubscribeの各メソッドが発火してることが確認できます。

このように、Subjectは値の送出を自分でコントロール可能なIObservableソースということになります。


で、なににつかうの

たとえば、


テスト

自分でIObservableな拡張メソッドを定義すること、あると思います。そんなとき、簡単に値を放り込んで動作確認してみるときとかに便利です。


通知可能な○○

適当にクラスのメンバーとして内包して、AsObservable()メソッドを通してIObservableを公開すると、Rxを使って購読可能な何かを非常に簡単に作ることが出来ます。たとえば、プロパティのSetterにOnNext()メソッドの呼び出しを挟むだけで、「値がセットされたときにそれが通知されるプロパティ」が完成します。まあ実用可能なレベルにするにはちょっと工夫しないといけないですけど、だいたいイメージ出来ると思います。


Subjectにも種類がある

Rxで定義されているSubjectには4つの種類があります。挙動が大きく違うのでちゃんと解説します。


Subject<T>

最もシンプルなSubjectです。上の解説で使用しているとおり、各Rxメソッドによる購読が可能で、値の送出メソッドを自分で呼び出して使用します。


BehaviorSubject<T>

これは、最新の値をキャッシュするSubjectです。インスタンスの生成時に初期値を与える必要があります。以降は、OnNext()が呼び出される度に最新の値をキャッシュして、Subscribeされた時は、キャッシュされた最新の値を即座に送出します。キャッシュの送出後はノーマルのSubjectと同様な挙動になります。


ReplaySubject<T>

これは、送出した値を全てキャッシュするSubjectです。OnNext()が呼び出される度に値をため込み、Subscribeされた時、それまでため込んだ値を全て送出します。こちらもキャッシュの送出後はノーマルのSubjectと同様な挙動になります。


AsyncSubject<T>

これだけは他のSubjectとはかなり別物です。OnNext()を何度呼び出しても購読者には何も通知されず、OnCompleted()を呼び出した瞬間、OnNext()で送り込んだ最後の値が購読者に通知されます。また、完了後にSubscribeされた場合は即座に最後の値を送出します。いわゆる非同期処理の結果通達・保持の動作を、長さ1のIObservableでエミュレートした存在と言えばいいでしょう。


Subjectのルール

というか、Rxのルールですが。

購読されているRxのソースは、


  • OnNextを0回以上呼び出し可能で

  • OnCompletedかOnErrorのどちらか片方を0回か1回呼び出し可能で

  • OnCompletedかOnErrorのどちらかが呼び出された後は、OnNext、OnCompleted、OnErrorを呼び出すことは出来ない

というルールを持っています。

Rxがイベントであると考えると、このルールの意味がわかると思います。エラーになったイベントが発火するわけがないですからね。送出完了を通知したイベントが次の値を出すこともありえません。Rxに実装されたSubjectはこのルールに違反した呼び出しを行った場合、命令を無視するようになっています。普通はやらないですけど自前でSubjectの類を実装する場面などではこのルールをちゃんと厳守するようにしましょう。


SubjectとRxの関係

Subjectの基本を解説しましたが、Rxをよく触っている人なら気がつくと思います、「SubjectはRxの中でも異質な存在」だと言うことに。


Rxの値はいったいどこからくるの

Rangeの話をしましょう。Observable.RangeをSubscribeすると、指定した範囲の値が「生成されて」、次々とOnNextに送られてきます。そう、一般的なObservableのジェネレータは、IObservableの中で値を生成しています。値は全てIObservableの中で完結しているのです。では、Subjectは?Subjectを扱う文脈内でOnNextに直接値を送り込んでいます。つまり、Subjectの扱う値はRxの「外」から来ているものなのです。通常のジェネレータが、Rxが値を管理している、いわゆる「マネージド」な値だとしたら、Subjectから来る値は「アンマネージド」な値と表現することができます。そしてこの違いをそのまま表すRxの用語があります。それが、ColdなObservableと、HotなObservable、です。


HotなObservable、ColdなObservable

ここで突然ですが、ObservableにはHot、Coldという分類(?)が存在します。これがRx入門者の理解を妨げると評判なのでかみ砕いて説明していきましょう。


HotとColdは何が違うのか??

とりあえず、この記事をよみましょう。

[Qiita] RxのHotとColdについて by @toRisouP

Hot/Coldの性質の違い、挙動の違いについて、非常にわかりやすい図で書かれた解説記事です。動作を覚えるにはこの記事を読めば完璧です。まずは読みましょう。

「HotとColdの違い」について一通り読んだところで、ここからは、「何がHotで何がColdなのか?」を理解して、そして使いこなしていく方法を書いていきます。


何がHotで何がColdなのか?

既に上でちょっと書いてますが、結論から言います。


  • Subjectから送出される値が「Hot」

  • それ以外が「Cold」

以上です。


くわしく

解説する前に、まずはRxのCold、Hotの動作を簡単に確認します。


Cold

本来、RxはColdな存在です。値の生成から処理までを全てRxの内部で完結させているのが正しい姿(というとアレだけどとりあえずそういうことにして)です。Observableジェネレータは、Subscribeに呼応して値の生成を行います。送出される値のプール自体がSubscribe毎に新しく生成されます。複数のSubscribeが同じソースを購読しても、お互いに影響を及ぼすことはありません。


ジェネレータ(Cold)の動作

void Test()

{
var timer = Observable.Timer(Zero, FromSeconds(1)); // TimerはObservableジェネレータ == Cold

timer.Take(5).Subscribe(x => WriteLine($"Subscribe1: { x }"));

Thread.Sleep(3000); // 3秒スレッドをブロック

timer.Take(5).Subscribe(x => WriteLine($"__Subscribe2: { x }"));
}

/*/ 実行結果

Subscribe1: 0
Subscribe1: 1
Subscribe1: 2
__Subscribe2: 0
Subscribe1: 3
__Subscribe2: 1
Subscribe1: 4
__Subscribe2: 2
__Subscribe2: 3
__Subscribe2: 4

/*/


上のコードを実行してみると、2つのSubscribeが3秒差で0-4の値を出力しています。Subscribe毎に値のプールが生成されていることがわかりますね。


Hot

では、Hotな存在について。HotなObservableは、Subjectを通って外から来た値をRxメソッドで処理するものです。ということは、Subscribe毎に値の生成処理なんていうものはなく、また、Subscribe毎に値のプールが異なるということもありません。外から来る値をRxが管理する方法は無いからです。なので、Hotな値は複数のSubscribeに対して同時に送出されます。Hotが分配と呼ばれたりするのはこれが理由です。


Subject(Hot)の動作

void Test()

{
var timer = Timer(); // HotなIObservable

timer.Take(5).Subscribe(x => WriteLine($"Subscribe1: { x }"));

Thread.Sleep(3000);

timer.Take(5).Subscribe(x => WriteLine($"__Subscribe2: { x }"));
}

// Rxの外部で動作するTimer + Subject
public IObservable<long> Timer()
{
var subject = new Subject<long>();
Task.Run(async () =>
{
for (var x = 0L; true; x++)
{
subject.OnNext(x); // 無限ループで値を送出
await Task.Delay(1000).ConfigureAwait(false);
}
});
return subject.AsObservable();
}

/*/ 実行結果

Subscribe1: 1
Subscribe1: 2
Subscribe1: 3
__Subscribe2: 3
Subscribe1: 4
__Subscribe2: 4
Subscribe1: 5
__Subscribe2: 5
__Subscribe2: 6
__Subscribe2: 7

/*/


Timer()は完全にRxの外部で動いているカウントアップタイマーです。Subjectを介してRxに入ってきた値をSubscribeしています。実行結果は上の通りです。これはもう説明するまでも無いでしょう。


HotなObservable

RxでHotと呼ばれるIObservableは、その全てがSubjectから送出されたものです。HotなObservableを返すメソッドは全て、内部にSubjectを置いているか、またはSubjectの簡易実装を内部に持っています。全てに共通するのは、外部から来ている「アンマネージド」な値を扱う/扱えるということ。Hotなメソッドをいくつかピックアップします。なんとなく意味がわかればいいです。



  • FromEvent() 内部にSubjectを持ち、C#のeventが発火した値をOnNextに放り込みます。


  • StartAsync() 内部にAsyncSubjectを持ち、非同期メソッドの戻り値をOnNextに放り込みます。


  • Task.ToObservable() 内部にAsyncSubjectを持ち、TaskのResultをOnNextに放り込みます。


Cold->Hot変換?

ColdなObservableとHotなObservableが本質的に別物であることはなんとなく理解していると思います。では、「ColdをHotに変換する」と一言で説明されがちなPublish()メソッドとは一体何なのでしょう?答えは簡単です。「一旦Subscribeで値をRxの外に取り出し、Subjectを使ってもう一度中に入れる」だけです。Publish()の最も簡易的な実装は以下になります。

public static IObservable<T> Publish<T>(this IObservable<T> source)

{
var subject = new Subject<T>();
source.Subscribe(
x => subject.OnNext(x),
ex => subject.OnError(ex),
() => subject.OnCompleted());
return subject.AsObservable();
}

この実装だと、Connect()メソッドを用意していないので、CurrentThreadSchedulerなCold Observableに接続すると後続に何も流れないという問題があります(致命的!)が、Cold->Hot変換という要件そのものは満たしています。ちなみに、上のコードで使用しているSubjectをReplaySubjectに置き換えればReplay()メソッドの出来上がりです。AsyncSubjectに置き換えればPublishLast()メソッドになります。また、IConnectableObservableのConnect()メソッドは、内部sourceのSubscribe()呼び出しと同じ意味です。


HotなObservableをつかう

といっても、一度Rxの中に入ってしまえば後は普通のRxソースと同じように扱えます。なので、注意点を挙げます。


  • Rxの拡張メソッドを自前で定義するとき、sourceを複数回参照すると、Cold/Hotで挙動が異なる

  • Subscribeを1つも接続していない場合、Hotな値は虚空に消える

  • PublishなどのCold->Hot変換メソッドのConnect()メソッド呼び出しのタイミングを意識する


その他てきとうなこと


Subjectの値はスケジューラに乗らない

以前、Rxは全ての処理がスケジューラに乗って動作するフレームワークだと説明しましたね。アレは嘘です。

というのはかなりアレなんですけど、実際嘘です。というのも、Subjectの送出する値(==Hotな値)は、Subjectの各送出メソッドを呼び出したコンテキスト上で値の送出を行います。なので、全ての値がスケジューラに乗っていることが保証されているObservableジェネレータとは違い、Subjectの値はスケジューラに乗りません。もちろん、Rxのスケジューラは生で扱えるので、Subjectの値送出操作をスケジューラに乗せることも可能です。が、ふつうはやらない。また、購読チェーンの中継メソッドでスケジューラの切り替えなどを行っている場合は、当然その時点からスケジューラを通るようになります。


Subjectデバッグの落とし穴

Subjectでのデバッグは手軽なのでよく使いますが、「Subjectの出す値はHot」「Subjectの値はスケジューラに乗らない」という2つのことを念頭に置いておかないとハマることがあります。特に、Cold/Hotで挙動が異なるパターンはよくあるので、意識しておかないと死にます。また、スケジューラ由来の落とし穴もたまーに存在するので注意しましょう。


まとめ

言ってることがごちゃごちゃになってしまったのでまとめます。


  • Subjectは直接値を投入できるIObservableソース

  • Subjectは値の投入が容易なので、Rxメソッドの動作確認などに使うと便利

  • SubjectはRxの「外」の値をIObservableに入れるためのゲートウェイ

  • 「HotなObservable」とは、Rxの「外」から来る値のこと

  • 「HotなObservable」とは、つまり、「Subjectから送出される値」のこと

  • Subjectは複数のSubscribeに対して同じ値を同時に流す(==分配する)

  • Rxネイティブのメソッドでは、外から来る値を扱うメソッドと明示的なCold->Hot変換を行うメソッド以外は全てCold

明快な概念なはずなのにごちゃごちゃした説明になってしまい申し訳ないです。言葉にするってむずかしい。こんなですけど少しでもRxの理解の手助けになれば良いと思います。

以上です。ありがとうございました。