いまさら聞けないReactive Extensions.4

  • 22
    Like
  • 2
    Comment
More than 1 year has passed since last update.

Advent Calendarの24日目の記事です。
何を書くか迷った結果、Reactive Extensions関係の記事を書くことにしました。

イベントからストリームを生成する

昔ながらのライブラリなどでは、何らかの通知・監視といった操作にイベントが使われていました。
今もRXを使わないようなライブラリでは、イベントが使われていますし、WPFもフォームもイベントが使われています。

Subscribeすると、イベントハンドラにイベントリスナーが関連付けられます。
Subscribeは、IDisposableを返し、それをDisposeすることでイベントハンドラからイベントリスナーを解除することができます。

イベントが発生すると、イベントデータがOnNextによって伝播され、ストリームに流されます。
SubscribeをDisposeするまでイベントが関連付けられた状態になるため、不要になったタイミングでDisposeを呼び出す必要があります。

イベントのような仕組みのストリームをRXでは、HOTなオブザーバーと呼んでいます。

さてイベントをRXで扱う方法は、簡単です。
以下にそれらのショートコードを記載しておきます。

イベントソース
public delegate void DummyHandler( int x );
public delegate void DummyHandler<T>( T x );
class Sample {
    public event EventHandler<EventArgs> Handler1;
    public event EventHandler Handler2;
    public event Action Handler3;
    public event Action<int> Handler4;
    public event DummyHandler Handler5;
    public event DummyHandler<string> Handler6;
}

イベント登録

Observable.FromEventPatternは、EventHandler系のイベントからストリームを生成するもので、
Observable.FromEventは、それ以外のイベントからストリームを生成します。

各種イベントの登録パターン
var hoge = new Sample();

// EventHandler<EventArgs>
Observable.FromEventPattern( x => hoge.Handler1 += x.Invoke , x => hoge.Handler1 -= x.Invoke )
    .Select( x => x.EventArgs )
    .Subscribe( Console.WriteLine );

// EventHandler
Observable.FromEventPattern( x => hoge.Handler2 += x , x => hoge.Handler2 -= x )
    .Select( x => x.EventArgs )
    .Subscribe( Console.WriteLine );

// Action
Observable.FromEvent( x => hoge.Handler3 += x , x => hoge.Handler3 -= x ).Subscribe( _ => Console.WriteLine( "_" ) );

// Action<int>
Observable.FromEvent<int>( x => hoge.Handler4 += x , x => hoge.Handler4 -= x ).Subscribe( Console.WriteLine );

// DummyHandler
Observable.FromEvent<int>( x => hoge.Handler5 += x.Invoke , x => hoge.Handler5 -= x.Invoke ).Subscribe( Console.WriteLine );

// DummyHandler<string>
Observable.FromEvent<string>( x => hoge.Handler6 += x.Invoke , x => hoge.Handler6 -= x.Invoke ).Subscribe( Console.WriteLine );

EventHandler系以外でも一見、FromEventPatternが使えるように思えますが、実行時に"セキュリティの透過性"といった例外が発生するため注意が必要です。

イベントの解除

イベントを解除するには、Subscribeで受け取ったIDisposableのDisposeを実行します。

イベントに登録と解除
IDisposable resource = 
   Observable.FromEventPattern( x => hoge.Handler1 += x.Invoke , x => hoge.Handler1 -= x.Invoke )
       .Select( x => x.EventArgs )
       .Subscribe( Console.WriteLine );

resource.Dispose();

拡張メソッドを作っておく

例えば、PropertyChangedイベントなどよく使うイベントに関しては、拡張メソッドを用意しておくと便利です。

PropertyChangedイベントを監視するための拡張メソッド
public static IObservable<PropertyChangedEventArgs> AsPropertyChanged<T>( this T source ) 
   where T : INotifyPropertyChanged
     => Observable
          .FromEventPattern<PropertyChangedEventArgs>( 
              x => source.PropertyChanged += x.Invoke , x => source.PropertyChanged -= x.Invoke
          )
          .Select( x => x.EventArgs );

PropertyChangedイベントの発生を監視する

class Sample : INotifyPropertyChanged {
    public event PropertyChangedEventHandler PropertyChanged;
        public int Value { get; set; } // PropertyChangedイベントを発生させたりするところは省略...
}

var s = new Sample();
var resource = s.AsPropertyChanged().Subscribe( x => Console.WriteLine( x.PropertyName ) );
resource .Dispose(); // イベントの解除

特定のプロパティを監視する

あるプロパティが変化したとき、特定の処理を実行したい場面は、よくあることです。

特定のプロパティについてPropertyChangedイベントを監視するための拡張メソッド
public static IObservable<PropertyChangedEventArgs> AsPropertyChanged<T>( this T source , string name ) 
   where T : INotifyPropertyChanged
     => Observable
          .FromEventPattern<PropertyChangedEventArgs>( 
              x => source.PropertyChanged += x.Invoke , x => source.PropertyChanged -= x.Invoke
          )
          .Select( x => x.EventArgs )
          .Where( e => e.PropertyName == name );
Valueプロパティの更新通知を監視する
new Sample().AsPropertyChanged().Subscribe( nameof( Sample.Value ) );

nameofの指定

var hoge = users.Piyo.GetData();

このようにその場で型名がわからないような場合、nameofの指定が少しだけ手間になります。
変数を参照できる場合、下記のように指定すれば良いのですが…

nameof( hoge.Value )

わざわざ一時変数を用意するまでもない場合に下記のような記述を行うことになりますが、
GetDataメソッドの戻り値の型をSampleではなく別の何かに変えた場合、破綻します。
もちろん単純な型名の変更であれば、追従しますが…

GetDataの型が変わるとnameofが破綻するかも知れない
users.Piyo.GetData()
          .AsPropertyChanged( nameof( Sample.Value ) );

そこで次のようにプロパティ名をラムダ式で受け取るオーバーロードを用意しておくとこれを解決できます。

ラムダ式でプロパティ名を受け取る
IObservable<PropertyChangedEventArgs> AsPropertyChanged<T>( this T source , Func<T , string> getPropertyName ) 
 where T : INotifyPropertyChanged
     => Observable
          .FromEventPattern<PropertyChangedEventArgs>( 
              x => source.PropertyChanged += x.Invoke , x => source.PropertyChanged -= x.Invoke
          )
          .Select( x => x.EventArgs )
          .Where( e => e.PropertyName == getPropertyName( default(T) ) );
GetDataの型が変わっても追従する。
users.Piyo.GetData()
          .AsPropertyChanged( x => nameof( x.Value ) );

これの注意点は、このラムダ式は、nameofを受け取るのではなく stringを受け取るということです。
そのため、Valueがstringの場合、次のようにも書けてしまいますので注意が必要です。

nameofの付け忘れに注意する
users.Piyo.GetData()
          .AsPropertyChanged( x => x.Value );

TaskとIObservableの変換

.NET 4.0でTask、.NET 4.5で async/awaitが登場したことで非同期プログラミングが身近なものになりました。
RXが注目され出したのも、ちょうどその頃で、async/awaitにすべきか、RXにすべきで意見が分かれていたように思えます。

ネットワークアクセスなどの重たい、フリーズの可能性がある処理は、非同期で行うのが今日の常識です。
非同期メソッドは、以下のようにTask<T>を用いて定義します。

非同期メソッドの例
Task<int> GetAsync() => Task.Run( () => 100 );

async/awaitが登場したことにより、非同期メソッドを同期メソッドを呼び出すのと同じように記述できるようになりました。

async/awaitを用いた非同期メソッドの実行
async void Hoge(){
   var result = await GetAsync();
   Console.WriteLine( result );
}

IObservable<T>に変換する

Task<T>のような非同期メソッドをソースにストリームを生成するには、次のようにToObservableメソッドを呼び出すだけで、
RXに落とし込むことができます。

using System.Reactive.Threading.Tasks; // 拡張メソッドなので必須

void main( ){
  Task.Run( () => 1000 )
    .ToObservable()
    .Subscribe( Console.WriteLine );
}

Task<T>に変換する

逆にIObservable<T>からTask<T>に変換するには、ToTaskメソッドを使います。

Task<int> task = Observable.Return( 100 ).ToTask();

非同期メソッドの待機

何らかの値を返すストリーム
var observable = Observable.Create<int>( o =>
{
    o.OnNext( 100 );
    o.OnNext( 200 );
    o.OnNext( 300 );
    o.OnCompleted();
    return Disposable.Empty; // 何もしない IDisposable
} );

このようなColdなストリームがあった場合、その値を待機したい場合があります。
OnCompletedが呼ばれると待機が完了し、一番最後にOnNextされた値が返されます。

そのため、OnCompletedが呼ばれないようなストリームに対して待機すると、永遠に待機したままの状態になります。

async/await

async / awaitで待機する方法は、簡単です。

async void Test() {
    Console.WriteLine( await observable );
    // 300
}

Waitメソッドで待機する

Waitメソッドを用いると、その場でストリームの完了を待機できます。
awaitと似ていますが、awaitが非同期なのに対し、Waitメソッドは、同期的に待機することができます。

Console.WriteLine( observable.Wait() );
// 300

値が無いストリームときの規定値

待機操作は、ストリームに1つ以上の値を期待しています。
しかし、ストリームに値がこないケースも十分に考えられるでしょう。

var observable = Observable.Create<int>( o =>
{
    o.OnCompleted();
    return Disposable.Empty;
} );

Console.WriteLine( observable.Wait() );
Console.ReadLine();

このようにOnNextが1つもされずにOnCompletedが呼び出された場合、以下のような例外が発生します。

型 'System.InvalidOperationException' のハンドルされていない例外が System.Reactive.Linq.dll で発生しました
追加情報:Sequence contains no elements.

DefaultIfEmptyメソッドを指定し、規定値(指定しない場合は、型の規定値)を待機結果として取得することができます。

csharp:値が無ければ、規定値( 1000 )を返す
Console.WriteLine( observable.DefaultIfEmpty( 1000 ).Wait() );

タイムアウト

何らかの原因で一定時間経過してもストリームが完了しないことがあるかも知れません。
そうしたとき、いつまでも待ち続けてしまうのは、リソースを無駄でしかありませんし、Waitメソッドで待機している場合、
スレッドをブロックして待機していることになるため、スレッドの無駄使いにもつながります。

3秒待機しても値がこなければエラーにする
var result = observable
    .Timeout( TimeSpan.FromSeconds( 3 ) )
    .Wait();
Console.WriteLine( result );

SelectManyで非同期メソッドを実行する

SelectManyで非同期メソッド(Task<T>)を呼び出すと、非同期メソッドの実行結果を後続に伝播します。

SelectManyで非同期結果を取得する
Observable.Return( 100 )
      .SelectMany( x => Task.Run( () => x * 2 ) )
      .Subscribe( Console.WriteLine );
Select+Switchを使う方法
Observable.Return( 100 )
     .Select( x => Task.Run( () => x * 2 ) )
     .Switch()
      .Subscribe( Console.WriteLine );

Begin~/End~系の非同期メソッド

Task以前、非同期メソッドと言えば、Begin~メソッドとEnd~メソッドのペアでデザインされた、
Asynchronous Programming Model (APM)という手法が使われていました。

TaskやRxが使える現代において、わざわざAPMを使うことはもうありませんが、古いライブラリなどでは、
APMが使用されていることがあるでしょう。

例えば、Socketクラスでの非同期接続は、BeginConnectメソッドとEndConnectメソッドのペアで構成されています。
そのようなAPMな非同期メソッドをRXに落とし込むには、Task.Factory.FromAsyncメソッドを用いて、
一度Taskに変換した後、ToObservableメソッドでIObservable<Unit>に変換します。

APMは、Task化してそこからRx化する
Task.Factory
    .FromAsync( socket.BeginConnect , socket.EndConnect , "127.0.0.1", 80 , null )
    .ToObservable();

これにより、接続が確定した時点で後続に値が伝播されるようになります。
更にRXにすることで次のようにタイムアウト処理などを簡単に挟めるようになります。

タイムアウトを追加
var o = Task.Factory.FromAsync( socket.BeginConnect , socket.EndConnect , "127.0.0.1", 80 , null )
            .ToObservable()
            .Where( _ => socket.Connected )        // 接続されているかチェック
            .Timeout( TimeSpan.FromSeconds( 5 ) );  // 5秒以内に接続されてなければエラー扱いにする

This post is the No.24 article of C# Advent Calendar 2015