【Reactive Extensions】 Hot変換はどういう時に必要なのか?

  • 123
    Like
  • 0
    Comment
More than 1 year has passed since last update.

UniRxについての記事のまとめはこちら


以前、RxのHotとColdについてでObservableのColdとHotという性質について説明しました。
今回はより具体的に、どういうシチュエーションでHot変換をするべきかを説明したいと思います。

Hot変換するべきポイント

いろいろシチュエーションはありますが、一番Hot変換が重要となるシチュエーションは1つのストリームを複数回Subscribeする場合です。実際のコードを見ながら説明したいと思います。

例)入力された文字列が特定キーワードに一致するか調べる

Hot変換が必要な例として、「入力されたキー入力を監視して4文字の特定のキーワードが入力されたかを調べるストリーム」を作ってみます。

準備

まずはその準備として入力されたキー情報を4文字ずつにまとめるストリームを作ります。

入力文字を4文字ずつにまとめるkeyBufferStream
var keyBufferStream
    = Observable.FromEvent<KeyEventHandler, KeyEventArgs>(
        h => (sender, e) => h(e),
        h => KeyDown += h,
        h => KeyDown -= h)
        .Select(x => x.Key.ToString()) //入力キーを文字に変換
        .Buffer(4, 1) //4つずつにまとめる
        .Select(x => x.Aggregate((p, c) => p + c)); //文字から文字列に変換

//結果を表示してみる
keyBufferStream.Subscribe(Console.WriteLine);
実行結果の例(ABCDEFGHとキー入力した結果)
ABCD
BCDE
CDEF
DEFG
EFGH

このようにkeyBufferStreamは入力キーが4文字ずつにまとまって流れるストリームです。

keyBufferStreamを使って"HOGE"または"FUGA"の入力を監視する

それでは、このkeyBufferStreamを使って「HOGE」と「FUGA」を監視してみましょう。
Whereを挟みHOGE版とFUGA版で2回Subscribeすることにします。

"HOGE"と"FUGA"をそれぞれ監視する
keyBufferStream
    .Where(x => x == "HOGE")
    .Subscribe(_ => Console.WriteLine("Input HOGE"));

keyBufferStream
    .Where(x => x == "FUGA")
    .Subscribe(_ => Console.WriteLine("Input FUGA"));
実行結果(hogefugaと入力した結果)
Input HOGE
Input FUGA

keyBufferStream.png

それぞれの文字列に反応するストリームを作ってSubscribeすることができました。
めでたしめでたし…、

とは行きません。このストリームには大きな問題があります。

何が問題なのか?

上記のストリームは何が問題なのか?それはkeyBufferStreamがほぼCold Observableで形成されているのが問題です。以前の記事でも説明しましたが、Cold Observableは枝分かれしません。Subscribeする度に都度新しくストリームを生成する特性があります。

そのため上記の様な書き方をしてしまうと以下の様な問題が発生することになります。

  • 裏で多重にストリームが生成されてしまいメモリとCPUを無駄遣いする
  • Subscribeしたタイミングによって流れてくる結果が違う(参考Cold Observableの性質)
ストリームが2重になっている証拠
var keyBufferStream
    = Observable.FromEvent<KeyEventHandler, KeyEventArgs>(
        h => (sender, e) => h(e),
        h => KeyDown += h,
        h => KeyDown -= h)
        .Select(x => x.Key.ToString())
        .Buffer(4, 1)
        .Do(_=>Console.WriteLine("Buffered")) // ← BufferがOnNextを放出したタイミングでコンソールに出す
        .Select(x => x.Aggregate((p, c) => p + c));

keyBufferStream
    .Where(x => x == "HOGE")
    .Subscribe(_ => Console.WriteLine("Input HOGE"));

keyBufferStream
    .Where(x => x == "FUGA")
    .Subscribe(_ => Console.WriteLine("Input FUGA"));
実行結果(AAAAとBufferが1回だけ動くようにキー入力)
Buffered
Buffered //Bufferは1回しか動いていないはずなのに2回出力されている = ストリームが2重で動いている

Cold.png
Hot Observableがストリームの根源であるFromEventしか無いため、Subscribeする度にFromEventから新しくストリームが生成されてしまう動きになってしまっています。

問題の解決策「Hot変換」

ここで最初の「Hot変換は1つのストリームを同時に複数Subscribeする場合に使う」という話に戻ります。
つまり、Hot変換することでストリームの分岐点を作成し、複数Subscribeした時にストリームを1つにまとめることができるようになるのです。

Hot.png

Hot変換後の例
var keyBufferStream
    = Observable.FromEvent<KeyEventHandler, KeyEventArgs>(
        h => (sender, e) => h(e),
        h => KeyDown += h,
        h => KeyDown -= h)
        .Select(x => x.Key.ToString())
        .Buffer(4, 1)
        .Select(x => x.Aggregate((p, c) => p + c))
        .Publish() //PublishでHot変換(Publishが代表してSubscribeしてくれる)
        .RefCount(); //RefCountはObserverが追加された時に自動Connectしてくれるオペレータ

keyBufferStream
    .Where(x => x == "HOGE")
    .Subscribe(_ => Console.WriteLine("Input HOGE"));

keyBufferStream
    .Where(x => x == "FUGA")
    .Subscribe(_ => Console.WriteLine("Input FUGA"));
実行結果(hogefugaと入力)
Input HOGE
Input FUGA

Hot変換のやり方はいくつかありますが、最も手軽なものがPublish()とRefCount()を組み合わせるものです。
今回はHot変換の必要性について説明したいのでPublishとRefCountの詳細な説明については割愛させて頂きます。(説明としてはこちらが詳しいです

まとめ

  • ストリームを意図的に分岐させたい時にHot変換を行う
  • ストリームを生成して返すプロパティや関数を定義した場合は末尾でHot変換をしておくと安全である
  • Hot変換を忘れるとメモリやCPUの無駄遣いになったりSubscribeのタイミングで挙動がズレたりする
  • Hot変換のオペレータはいくつかあるが、Publish()+RefCount()の組み合わせが便利(万能ではない)

参考 Introduction Rx - Hot and Cold observables