Posted at

いまさら聞けないReactive Extensions.3

More than 3 years have passed since last update.


Rxの世界と外の世界


流れのある何か

単一や関数、非同期メソッド、イベント、Task、IEnumerableなど、

即値で取得できるもの、遅延評価されるもの、不定期に発生するもの、非同期なもの、

様々ではあるが、すべてデータの流れ(ストリーム)として表現することができる。

すなわち、それらはすべて、IObservable<T> に置き換えることが可能となる。


100という値を1つ流すデータストリームを作る。

IObservable<int> stream = Observable.Return( 100 );



時間軸

流れとは、時間の流れである。


ほぼ0秒で100という値が流れてくるデータストリーム

IObservable<int> stream = Observable.Return( 100 );



  • 若干の時間が掛かりそうな非同期メソッドは、数秒後に結果が流れが発生するもの。

  • タイマーは、指定時間後、時間毎に発生する流れ。

  • 配列やIEnumerable<T>は、約0秒間隔で発生する連続した値の流れ。

  • 関数は、約0秒後に戻り値が流れとして発生するもの。

時間を扱うことができるようになると、時間で制御できる。

例えば、短時間で連続的に流れが発生する何かがあって、落ち着いたら処理したいような場合、Throttleオペレータを使うことで、指定した時間、流れが起きなければ、後続に流すといった制御が可能になる。


1秒以上、落ち着いたら後続に流す

provider

.Throttle( TimeSpan.FromSeconds( 1 ) )
.Subscribe( Console.WriteLine );


関数であること

Rxは、関数型プログラミングの影響を受けている。

関数とは、0個以上の引数を取り、何かしら返すもの。

戻り値のあるメソッドであれば、関数と見なせる。

逆に戻り値が無ければ、それは関数にならない。


Unit

特にデータや戻り値が無い場合、Unitを使う。

Unitは、値が無いことを示すものでvoidと同じようなもの。


戻り値がないことを示す関数

Unit Function( int x ){

return Unit.Default;
}

特に処理するようなデータは無いものの、通知だけしたいような場合、

ただ流れが発生するだけの何かなら、IObservable<Unit>にすれば良い。

Unitの利用例

いくつかのIObservableを合成して1つのストリームとして扱いたいことがある。

Mergeオペレータを使うと流れを合成することができる。

しかし、同じ型の流れのみ合成可能だという制限がある。


異なる型の流れなのでコンパイルエラー

Observable.Timer( TimeSpan.FromMinutes( 30 )  ) 

.Merge( Observable.FromEventPattern<EventArgs>( x => Click += x , x => Click -= x ) )

Timerは、IObservable<long>型の流れ、FromEventPatternは、IObservable>型の流れ。

.Select( _ => Unit.Default )


Unitとして統一することで同じ型の流れになり、合成可能になる。

Observable.Timer( TimeSpan.FromMinutes( 30 ) )

.Select( _ => Unit.Default )
.Merge( Observable.FromEventPattern<EventArgs>( x => Click += x , x => Click -= x )
.Select( _ => Unit.Default )
);

.Select( _ => Unit.Default )は、よく使うので拡張メソッドを作っておくと楽できる。

public static IObservable<Unit> AsUnit<T>( this IObservable<T> source ) 

=> source.Select( _ => Unit.Default );



IObservable<T>を基準として考える

Rxの世界では、1 より、Observable.Return(1)のほうが自然な表現なので、

外部からの入力は、IObservable<T>に何らかの手段で変換する必要がある。

Task<T>などIObservableでないものを受け取るオペレータがあったとしても、それは利便性のためで、

内部では、だいたいIObservable相当なものに変換しているものと捉えておけば良い。

Task<T>は、T型のデータが1つ流れてきて完了するIObservable<T>である。

IObservable<T>が基準であると捉えられれば、Catchオペレータなど、

IObservable<T>を返すように期待している設計がRxにおいて自然体であり、違和感が無くなるだろう。