UniRxについての記事のまとめはこちら
以前、RxのHotとColdについてでObservableのColdとHotという性質について説明しました。
今回はより具体的に、どういうシチュエーションでHot変換をするべきかを説明したいと思います。
#Hot変換するべきポイント
いろいろシチュエーションはありますが、一番Hot変換が重要となるシチュエーションは1つのストリームを複数回Subscribeする場合です。実際のコードを見ながら説明したいと思います。
例)入力された文字列が特定キーワードに一致するか調べる
Hot変換が必要な例として、「入力されたキー入力を監視して4文字の特定のキーワードが入力されたかを調べるストリーム」を作ってみます。
###準備
まずはその準備として入力されたキー情報を4文字ずつにまとめるストリームを作ります。
var keyBufferStream
= Observable.FromEvent<KeyEventHandler, KeyEventArgs>(
h => (sender, e) => h(e),
h => KeyDown += h,
h => KeyDown -= h)
.Select(x => x.Key.ToString()) //入力キーを文字に変換
.Buffer(4, 1) //4つずつにまとめる
.Select(x => x.Aggregate((p, c) => p + c)); //文字から文字列に変換
//結果を表示してみる
keyBufferStream.Subscribe(Console.WriteLine);
ABCD
BCDE
CDEF
DEFG
EFGH
このようにkeyBufferStreamは入力キーが4文字ずつにまとまって流れるストリームです。
###keyBufferStreamを使って"HOGE"または"FUGA"の入力を監視する
それでは、このkeyBufferStreamを使って「HOGE」と「FUGA」を監視してみましょう。
Whereを挟みHOGE版とFUGA版で2回Subscribeすることにします。
keyBufferStream
.Where(x => x == "HOGE")
.Subscribe(_ => Console.WriteLine("Input HOGE"));
keyBufferStream
.Where(x => x == "FUGA")
.Subscribe(_ => Console.WriteLine("Input FUGA"));
Input HOGE
Input FUGA
それぞれの文字列に反応するストリームを作ってSubscribeすることができました。
めでたしめでたし…、
とは行きません。このストリームには大きな問題があります。
##何が問題なのか?
上記のストリームは何が問題なのか?それはkeyBufferStreamがほぼCold Observableで形成されているのが問題です。以前の記事でも説明しましたが、Cold Observableは枝分かれしません。Subscribeする度に都度新しくストリームを生成する特性があります。
そのため上記の様な書き方をしてしまうと以下の様な問題が発生することになります。
- 裏で多重にストリームが生成されてしまいメモリとCPUを無駄遣いする
- Subscribeしたタイミングによって流れてくる結果が違う(参考Cold Observableの性質)
var keyBufferStream
= Observable.FromEvent<KeyEventHandler, KeyEventArgs>(
h => (sender, e) => h(e),
h => KeyDown += h,
h => KeyDown -= h)
.Select(x => x.Key.ToString())
.Buffer(4, 1)
.Do(_=>Console.WriteLine("Buffered")) // ← BufferがOnNextを放出したタイミングでコンソールに出す
.Select(x => x.Aggregate((p, c) => p + c));
keyBufferStream
.Where(x => x == "HOGE")
.Subscribe(_ => Console.WriteLine("Input HOGE"));
keyBufferStream
.Where(x => x == "FUGA")
.Subscribe(_ => Console.WriteLine("Input FUGA"));
Buffered
Buffered //Bufferは1回しか動いていないはずなのに2回出力されている = ストリームが2重で動いている
Hot Observableがストリームの根源であるFromEventしか無いため、Subscribeする度にFromEventから新しくストリームが生成されてしまう動きになってしまっています。
##問題の解決策「Hot変換」
ここで最初の「Hot変換は1つのストリームを同時に複数Subscribeする場合に使う」という話に戻ります。
つまり、Hot変換することでストリームの分岐点を作成し、複数Subscribeした時にストリームを1つにまとめることができるようになるのです。
var keyBufferStream
= Observable.FromEvent<KeyEventHandler, KeyEventArgs>(
h => (sender, e) => h(e),
h => KeyDown += h,
h => KeyDown -= h)
.Select(x => x.Key.ToString())
.Buffer(4, 1)
.Select(x => x.Aggregate((p, c) => p + c))
.Publish() //PublishでHot変換(Publishが代表してSubscribeしてくれる)
.RefCount(); //RefCountはObserverが追加された時に自動Connectしてくれるオペレータ
keyBufferStream
.Where(x => x == "HOGE")
.Subscribe(_ => Console.WriteLine("Input HOGE"));
keyBufferStream
.Where(x => x == "FUGA")
.Subscribe(_ => Console.WriteLine("Input FUGA"));
Input HOGE
Input FUGA
Hot変換のやり方はいくつかありますが、最も手軽なものがPublish()とRefCount()を組み合わせるものです。
今回はHot変換の必要性について説明したいのでPublishとRefCountの詳細な説明については割愛させて頂きます。(説明としてはこちらが詳しいです)
#まとめ
- ストリームを意図的に分岐させたい時にHot変換を行う
- ストリームを生成して返すプロパティや関数を定義した場合は末尾でHot変換をしておくと安全である
- Hot変換を忘れるとメモリやCPUの無駄遣いになったりSubscribeのタイミングで挙動がズレたりする
- Hot変換のオペレータはいくつかあるが、Publish()+RefCount()の組み合わせが便利(万能ではない)