Edited at

いまさら聞けないReactive Extensions.1

More than 3 years have passed since last update.


Rxの基本

何らかのプロバイダ IObservable<T>Subscribeすることで監視することにより、

プロバイダから状態の通知(Push)をオブザーバで受け取ることによって構成されるオブザーバパターン。


プロバイダ

別称、データソースストリームなどと呼ぶことがある。

プロバイダの役割

ある状態の通知を行うのがプロバイダの役割である。

文脈によっては、通知ではなく、Push、発行、イベントなどと言うことがある。


状態の通知

プロバイダは、状態の通知をオブザーバに対して行う。

オブザーバは、インターフェース IObserver<T> を実装しており、下記のメソッドを公開している。

メソッド
概要

OnNext   
新しい値が発生したことを通知する。

OnError  
エラーが発生し、異常終了したことを通知する。

OnCompleted
正常に終了したことを通知する

これらのメソッドをプロバイダが適切に呼び出すことにより、状態の通知をオブザーバで受け取ることができる。


単純なプロバイダを作る

Observable.Createメソッドを使うと、汎用的なプロバイダを生成することができる。

例として、10という値を通知して終了するプロバイダを定義する。


10という値を発行して完了する

IObservable<int> provider = Observable.Create<int>( o => {

o.OnNext( 10 );
o.OnCompleted();
return System.Reactive.Disposables.Disposable.Empty;
} );

これにより、オブザーバがproviderを観測可能な仕組みができた。

観測可能な何かを表すのでインターフェースは、IObservable<T>となっている。


観測する

プロバイダを観測するには、Subscribeメソッドを使用する。

新しい値が通知されたときに何かする

プロバイダがOnNextを呼び出したときに何をすべきか定義する。


観測する

provider.Subscribe( x => Console.WriteLine( x ) );


これで、プロバイダから何らかの通知を受け取ったときに何をすべきか定義することができる。

正常終了時に何かする

プロバイダがOnCompletedを呼び出したときに何をすべきかを定義する。


完了時に何らかの処理を行う

provider.Subscribe( x => Console.WriteLine( x ) , () => Console.WriteLine("完了") );


異常終了時に何かする

プロバイダがOnErrorを呼び出したときに何をすべきかを定義する。


エラー発生時に何らかの処理を行う(エラーハンドリング)

provider.Subscribe( 

x => Console.WriteLine( x ) ,
e => Console.WriteLine( e.Message )
);

Subscribeメソッドは、複数のオーバーロードが定義されているが、実のところ、

IObservable<T>は、IDisposable Subscribe( IObserver<T> observer )に集約される。

わざわざIObserver<T>を実装したクラスなど作ってられないのでデリゲートで定義できるようになっているだけである。



観測の解除

Subscribeメソッドは、IDisposableを返す。

それを.Dispose() すると観測が解除される。

観測の解除は、次のいずれかによって行われる。


  • Subscribeによって返されたオブジェクトをDisposeした。

  • プロバイダがOnCompletedを呼び出し、正常終了した。

  • プロバイダがOnErrorを呼び出し、異常終了した。

確実にOnCompletedもしくは、OnErrorで終了するプロバイダであれば、わざわざDisposeを呼び出す必要はない。



プロバイダの性質

プロバイダの性質は、大きくわけて Hot / Cold に分類される。


Cold


  • Subscribeされることにより、作動を監視する。

  • オブザーバとは、1:1の関係を持つ。

ほとんどの場合、Coldな性質を持つプロバイダは、OnCompletedを呼び出す設計になっている。

当然ながら例外もある、例えば、Observable.Intervalは、指定時間間隔で通知し続けるColdなプロバイダであるが、OnCompletedを呼ばない。


Hot


  • Subscribeされていなくても作動している。

  • オブザーバとは、1:nの関係を持つ。

.NETのイベントのような特性で、リスナー(オブザーバ)が居なくても動き続ける。

その間の発生した通知は、破棄される。

Hotな性質を持つプロバイダは、OnCompletedを呼ばないものが多い。

そのため観測の必要が無くなったら、Disposeを呼び出す必要がある。



オペレータ

Rxの特徴の一つとして、オペレータという仕組みがある。


0~9までの値を通知するプロバイダ

var provider = Observable.Range( 0 , 10 );


これを何もせずに観測する。


そのまま観測する

provider.Subscribe( x => Console.WriteLine( x ) );


0

1
2
3
4
5
6
7
8
9

偶数のみの値を選択したい

C#では、以前によりLINQを使うことで、選択、射影、合成などのデータに対する操作を柔軟に行えた。

RxでもLINQ同様、いくつかの拡張メソッドを使うことにより、柔軟な操作を行えるようになっている。


2で割り切れる値のみを通す

provider

.Where( x => x % 2 == 0 )
.Subscribe( x => Console.WriteLine( x ) );

このようにWhereオペレータを挟み込むと、後続に通過させるべきかどうか判定することができる。

例では、2で割り切れる値のみ、後続に通過するようにしたので、結果は、次のようになります。

0

2
4
6
8


オペレータの仕組み

オペレータは、観測しながら後続のオブザーバに流す役目を持っています。

例えば、Whereは、重ね重ね、次のような実装になっています。

public static IObservable<T> Where<T>( this IObservable<T> source , Func<T , bool> predicate ) => new AnonymousObservable<T>(

observer => {
return source.Subscribe( x => {
if( predicate( x ) )
observer.OnNext( x );
} , observer.OnError , observer.OnCompleted );
}
);

predicateで値を判定して条件を満たすと、後続のオブザーバに渡すものの、

条件を満たさなければ、そのまま破棄されるという役目を持つオペレータになります。

またオペレータは、IObservable<T>の拡張メソッドとして定義されています。

上記の例のようにAnonymousObservable<T>クラスを用いることにより、自作することもできます。

オペレータの生成順序

オペレータは、上に辿るように生成されます。


オペレータの観測順序の検証

static class Program {

static void Main( string[] args ) {
Observable.Return( 100 )
.Dummy( "A" )
.Dummy( "B" )
.Dummy( "C" )
.Dummy( "D" )
.Dummy( "E" )
.Subscribe();

Console.ReadLine();
}

public static IObservable<T> Dummy<T>( this IObservable<T> source , string label ) => new AnonymousObservable<T>(
observer => {
Console.WriteLine( label );
return source.Subscribe( observer );
}
);
}


これを実行すると、次のように出力され、遡るように観測していることがわかります。

E

D
C
B
A



スケジューラ

Rxの重要な要素のひとつにスケジューラがある。

スケジューラにより、スレッドの切り替え、時間の制御を行うことができる。


スレッドの切り替え

WPFでもフォームでもUIに対する処理は、UIスレッド上で行う必要があります。

重い処理は、非同期でUIに対する処理は、UIスレッドで行うのが定石でしょう。

Rxでは、スケジュールを制御するためのオペレータが用意されています。


  • ObserveOn

  • SubscribeOn


SubscribeOn

SubscribeOnは、Subscribeを行うスケジューラを指定します。

using System;

using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

namespace ConsoleApplication1 {
static class Program {
static void Main( string[] args ) {
Console.WriteLine( $"Main:{Thread.CurrentThread.ManagedThreadId}" );
Observable.Return( 100 )
.Dummy( "A" )
.SubscribeOn( ThreadPoolScheduler.Instance )
.Dummy( "B" )
.Dummy( "C" )
.Dummy( "D" )
.Dummy( "E" )
.Subscribe();

Console.ReadLine();
}

public static IObservable<T> Dummy<T>( this IObservable<T> source , string label ) => new AnonymousObservable<T>(
observer => {
Console.WriteLine( $"Subscribe->{label}:{Thread.CurrentThread.ManagedThreadId}" );
return source.Subscribe( x => {
Console.WriteLine( $"OnNext->{label}:{Thread.CurrentThread.ManagedThreadId}" );
observer.OnNext( x );
} , observer.OnError , observer.OnCompleted );
}
);
}
}

わかりやすくするため、スレッドIDを含めて出力するようにしました。

実行結果は、次のようになります。

Main:9

Subscribe->E:9
Subscribe->D:9
Subscribe->C:9
Subscribe->B:9
Subscribe->A:11
OnNext->A:11
OnNext->B:11
OnNext->C:11
OnNext->D:11
OnNext->E:11

AのSubscribeがメインスレッドとは異なるスレッドで行われています。

.Dummy( "A" )

.SubscribeOn( ThreadPoolScheduler.Instance )

SubscribeOnでは、直属のオペレータやプロバイダに対するSubscribeをどのスケジューラで行うか指定できます。


ObserveOn

ObserveOnは、OnNextに対するスケジューラを指定します。

Observable.Return( 100 )

.Dummy( "A" )
.ObserveOn( ThreadPoolScheduler.Instance )
.Dummy( "B" )
.Dummy( "C" )
.Dummy( "D" )
.Dummy( "E" )
.Subscribe();

実行結果は、次のようになります。

OnNext->Bからスレッドが切り替わっていることがわかります。

Main:9

Subscribe->E:9
Subscribe->D:9
Subscribe->C:9
Subscribe->B:9
Subscribe->A:9
OnNext->A:9
OnNext->B:11
OnNext->C:11
OnNext->D:11
OnNext->E:11

ObserveOnは、後続にOnNextするスケジューラを指定します。


Rxで提供されているスケジューラ


  • ThreadPoolScheduler.Instance

    スレッドプール上で処理を行うスケジューラで非同期絡みでよく使われる。


  • TaskPoolScheduler.Default

    タスクプール(TPL)上で処理を行う、ThreadPoolSchedulerと同種と見て構わない。


  • Scheduler.CurrentThread

    カレントスレッドで逐次的に処理を行う。


  • Scheduler.Immediate

    カレントスレッドで即時的に処理を行う。


  • NewThreadScheduler.Default

    新しいスレッドを生成して、そのスレッド上で処理を行う。


  • new SynchronizationContextScheduler( context )

    SynchronizationContextを受け取り、そのSynchronizationContext上で処理する。


また、Scheduler.Default としてデフォルトのスケジューラも用意されているが、通常は、ThreadPoolScheduler.Instanceと同じである。

ちなみにいずれのスケジューラもISchedulerインターフェースを継承している。

既定のスケジューラ

プロバイダやオペレータによっては、既定のスケジューラが指定されているものがある。

例えば、Observable.Intervalは、ThreadPoolSchedulerがデフォルトとして指定されたプロバイダであり、ThrottleやDelayも同じく、ThreadPoolSchedulerをデフォルトとして指定されたオペレータとなっている。

このような性質があることを知らずにThrottleを使ったりすると、スレッドプール上からUIを操作してしまうようなことになりかねないので注意する必要がある。


時間の取り扱い

スケジューラの役割は、スレッドを切り替えるだけではない。

時間上に乗せることができる。

エラーが発生したら、指定した時間だけ遅延してエラーを通知するオペレータを作ってみる。

使い道としては、再試行の前に遅延させたいようなケースだろうか。


エラーを遅延するオペレータ

public static IObservable<T> DelayError<T>( this IObservable<T> source , TimeSpan delay ) => new AnonymousObservable<T>(

observer => {
var scheduleToken = new SerialDisposable();

return StableCompositeDisposable.Create(
scheduleToken ,
source.Subscribe( observer.OnNext , e => {
scheduleToken.Disposable = ThreadPoolScheduler.Instance.Schedule( delay ,
() => observer.OnError( e )
);
} , observer.OnCompleted )
);
}
);


エラーが発生したら3秒後にエラーを通知する。

static void Main( string[] args ) {

var provider = Observable.Create<DateTime>( o => {
o.OnNext( DateTime.Now );
o.OnError( new ApplicationException( "ERROR " + DateTime.Now.ToString() ) );
o.OnCompleted();
return Disposable.Empty;
} );

provider
.DelayError( TimeSpan.FromSeconds( 3 ) )
.Subscribe(
x => Console.WriteLine( x ) ,
e => Console.WriteLine( $"{e} -> {DateTime.Now}" ) ,
() => Console.WriteLine( "Completed." )
);

Console.ReadLine();
}


実行結果

2015/07/12 10:04:14

System.ApplicationException: ERROR 2015/07/12 10:04:14 -> 2015/07/12 10:04:17


スケジューラの使い方

ISchedulerの拡張メソッドとしてScheduleメソッドが用意されている。

Scheduleメソッドを使用することでスケジュールを登録することができる。

スレッドプールスケジューラ上で3秒後にOnNextを実行するには、次のようにScheduleを呼び出す。


ThreadPoolSchedulerにスケジュールする

ThreadPoolScheduler.Instance.Schedule( TimeSpan.FromSeconds( 3 ) , () => observer.OnNext( 1 ) );



スケジュールの取り消し

Scheduleメソッドは、スケジュールを取り消すために使用するIDisposableを返す。

スケジュールを取り消すには、Scheduleで返されたオブジェクトをDisposeすれば良い。

なおスケジュールは、プロバイダまたは、オペレータがDisposeされた時点で取り消す必要があるので注意が必要。



IDisposableの取り扱い

Rxでは、解除、取り消しといった操作をIDisposableとして統一してある。

System.Reactive.Disposables名前空間には、IDisposableを効率的に扱うためのクラスが用意されている。


Disposable.Create

デリゲート(Action)をIDisposableとして包み込み、Disposeされたときにデリゲートを実行するみの。

IDisposable hoge = Disposable.Create( () => Console.WriteLine( "a" ) );

hoge.Dispose();

a


SerialDisposable

Disposableプロパティにオブジェクトが設定されると、以前のオブジェクトをDisposeして新しい参照を設定する。

var hoge = new SerialDisposable();

hoge.Disposable = Disposable.Create( () => Console.WriteLine( "a" ) );
hoge.Disposable = Disposable.Create( () => Console.WriteLine( "b" ) );
hoge.Dispose();

a

b


SingleAssignmentDisposable

SerialDisposableと似ているがDisposableプロパティを設定できるのは、最初の1回目のみで2回目以降は、例外(InvalidOperationException)が発生する。


BooleanDisposable

IsDisposedプロパティでDisposeが呼ばれたかどうか確認できる。

var hoge = new BooleanDisposable();

Console.WriteLine( hoge.IsDisposed );
hoge.Dispose();
Console.WriteLine( hoge.IsDisposed );

False

True


CancellationDisposable

TokenプロパティでCancellationTokenを取得できる。

Disposeすると、CancellationTokenがキャンセル状態に移行する。

var hoge = new CancellationDisposable();

Console.WriteLine( hoge.Token.IsCancellationRequested );
hoge.Dispose();
Console.WriteLine( hoge.Token.IsCancellationRequested );

False

True

CancellationTokenを受け取るような非同期メソッドをプロバイダやオペレータで呼び出す場合に使用する。


ContextDisposable

指定されたSynchronizationContextを経由してオブジェクトをDisposeします。

なおSynchronizationContext.Postメソッドを使用するので非同期に処理される点に注意しておく必要があります。

※下記のコードは、コンソールでは動きません。WPFかフォームで検証して下さい。

Console.WriteLine( $"Main:{Thread.CurrentThread.ManagedThreadId}" );

var hoge = new ContextDisposable(
SynchronizationContext.Current ,
Disposable.Create( () => Console.WriteLine( $"Disposed:{Thread.CurrentThread.ManagedThreadId}" ) )
);

Task.Run( () => {
Console.WriteLine( $"Task:{Thread.CurrentThread.ManagedThreadId}" );
hoge.Dispose();
} );

Main:9

Task:10
Disposed:9


ScheduledDisposable

指定したスケジューラ上でオブジェクトをDisposeします。

Console.WriteLine( $"Main:{Thread.CurrentThread.ManagedThreadId}" );

var hoge = new ScheduledDisposable(
TaskPoolScheduler.Default ,
Disposable.Create( () => Console.WriteLine( $"Disposed:{Thread.CurrentThread.ManagedThreadId}" ) )
);

hoge.Dispose();

Main:9

Disposed:11


CompositeDisposable

Listのようなもので、IDisposableなオブジェクトをリストに登録してまとめることができます。

var hoge = new CompositeDisposable();

hoge.Add( Disposable.Create( () => Console.WriteLine( "A" ) ) );
hoge.Add( Disposable.Create( () => Console.WriteLine( "B" ) ) );
hoge.Add( Disposable.Create( () => Console.WriteLine( "C" ) ) );
var d = Disposable.Create( () => Console.WriteLine( "D" ) );
hoge.Add( d );
hoge.Remove( d );
Console.WriteLine( "----" );
hoge.Dispose();

D

----
A
B
C

Removeで削除したときに削除されたオブジェクトがDisposeされる。

hoge.Dispose()を呼び出したときに蓄積されていた、すべてのオブジェクトをDisposeする。


RefCountDisposable

参照カウンタ方式でDisposeします。

GetDisposableメソッドでカウンタをインクリメントし、GetDisposableメソッドで返されたIDisposableを

Disposeするとカウンタをデクリメントします。

var hoge = new RefCountDisposable( Disposable.Create( () => Console.WriteLine( "Disposed" ) ) );

var a = hoge.GetDisposable();
var b = hoge.GetDisposable();
var c = hoge.GetDisposable();

hoge.Dispose();
Console.WriteLine( "---" );
a.Dispose();
b.Dispose();
c.Dispose();

---

Disposed

参照カウンタが1以上のときにRefCountDisposable自身をDisposeしようとしても何も起きません。

参照カウンタが0になったときにコンストラクタで渡されたオブジェクトがDisposeされます。

hoge.Dispose()は、参照カウンタに0になったらコンストラクタで登録されたオブジェクトをDisposeするフラグを立てているだけです。


Disposable.Empty

何もしないIDisposableです。

IDisposableを返す必要があるものの、特に何も行う必要がない場合に使用します。

return Disposable.Empty;