はじめに

Rxってなにそれおいしいの?といったレベル感の方を対象に書きます。

Rxは概念が難しく、筆者自身調べれば調べるほどわからなくなったので、自分が理解したことをできるだけわかりやすく解説します。


Rx(Reactive Extensions)とは

非同期/イベント/時間に関する処理を、LINQ形式で簡潔かつ宣言的に記述することができるC#のさいきょうライブラリのことです。

eventとかLINQの機能を盛り込んで高機能にしたすごいやつ、それがRxです。

Rxの概念があまりに秀逸であったため、ReactiveXとして多言語(Java/JavaScript/Unity(C#)/Scala/C++/Ruby/Python/Go/Kotlin/Swift/PHP etc...)に移植され言語を超えて使用されています。


Rxの根幹となる考え方

RxはObserverパターンが核となる考え方です。

Observerパターンとは、観測対象(=Subject)観測者(=Observer)の2つの役割が存在し、Subjectの状態が変化した際にObserverに通知されるデザインパターンです。そのため、状態変化に応じた処理を記述する時に有効です。

このとき重要なのは、SubjectはObserver(の型)を直接知らないようにすることです。この抽象化を実現するために、一般的にはインターフェースが利用されます。.NET Framework 4では、SubjectのインターフェースとしてIObservable<T>が、ObserverのインターフェースとしてIObserver<T>が導入されました。


◇ IObservable<T> : 観測対象(=Subject)

IObservable<T>はSubscribeメソッドのみを持ちます。

1. IDisposable Subscribe(IObserver<T> observer)メソッド

引数で受け取ったObserverにイベントの通知を行うようにします。

つまり、Subjectに通知先(=Observer)を登録します。

戻り値のIDisposableのDisposeメソッドを呼び出すと通知を取り消します。ジェネリック型Tには、変更通知に用いる型を指定します。


IObservable<T>インターフェース


public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}


◇ IObserver<T> : 観測者(=Observer)

IObserver<T>には、IObservable<T>から状態変更を通知してもらうための3つのメソッドがあります。

1. void OnNext(T value)メソッド

IObservable<T>から発生した通知を受け取って処理を行います。

つまり、Subjectの状態が変化したことを通知します。

2. void OnError(Exception ex)メソッド

IObservable<T>で発生した例外を受け取って処理を行います。

つまり、Subjectで何らかのエラーが発生したことを通知します。

3. void OnCompleted()メソッド

IObservable<T>からの通知が終了した時の処理を行います。

つまり、Subjectの状態変化が完了したことを通知します。


IObserver<T>インターフェース


public interface IObserver<in T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}


ストリーム(Stream) == Subject

Rxではストリームと呼ばれるものを扱います。ストリームとはデータの流れるシーケンスであり、時間軸も存在しています。

ストリームの実態はSubject(IObservable<T>)です。ちなみにストリームに流れるデータの事をメッセージと呼びます。

ここで、Rxの挙動を追いかける上で非常に重要である、データの流れについて見ていきます。


データの流れ方の違い : Push型/Pull型


◇ Push型 : IObservable<T>

Push型は、発信源から情報が送信されてくるのを受信者が待機する(受動的に待ち受ける)という形です。

情報の発信源に対して複数の受信者を関連付けることができるので、同じ情報を一度に複数の対象者に配ることができます。


◇ Pull型 : IEnumerable<T>

Pull型は、受信者が必要に応じて情報を発信源に取得しに行く(能動的に取得する)という形です。

コレクションの中から特定の条件に合う要素を抜き出してどうこうする、というIEnumerable<T>ベースのLINQスタイルは、まさにPull型と言えます。実際、foreach文などでコレクションからデータを取得する際には、要素を取得する必要があるときになって初めて、MoveNextメソッドCurrentプロパティを利用して逐次アクセスするようになっています。


IObservable<T>の性質 : Hot / Cold


◇ Hot Observable

Hotな性質とは、同一のIObservable<T>シーケンスに関連付けたすべてのIObserver<T>に対し、一度に同じ値を通知することを言います。

Rxのストリームは、基本的にSubscribeされた瞬間に各オペレータの動作が始まるようになっています。ですが、Hot Observableをストリームの途中に挟むことで、Subscribeを実行するより前にストリームを稼働させることができます。また、ストリームを分岐することもできます。


◇ Cold Observable

Coldな性質とは、IObservable<T>シーケンスに関連付けたそれぞれのIObserver<T>に対し、個別に値を通知することを言います。

Cold Observableは、Subscribeされる(またはHot変換される)まで動作しません。つまり、自発的に何もしない受動的なObservableです。


RxでHello worldをやってみる

みんなだいすきHello worldをRxでやってみたいと思います。

一番シンプルなRxのHello worldは、単純なObserverパターンになります。ここで肝になるインターフェースはIObservable<T>とIObserver<T>の2つです。


IObservable<T>の実装

internal class MyObservable : IObservable<string>

{
private ICollection<IObserver<string>> collection = new List<IObserver<string>>();

public IDisposable Subscribe(IObserver<string> observer)
{
collection.Add(observer);
// 一度追加したObserverの削除には対応しない
return null;
}

// 自分をSubscribeしてるObserverに変更を通知する
public void Notify(string value)
{
foreach (var observer in collection)
{
observer.OnNext(value);
}
}
}



IObserver<T>の実装

internal class MyObserver : IObserver<string>

{
// 変更通知を受け取るメソッド
public void OnNext(string value)
{
Console.WriteLine("OnNext : {0}", value);
}

public void OnError(Exception error)
{
Console.WriteLine("OnError");
}

public void OnCompleted()
{
Console.WriteLine("OnCompleted");
}
}



Mainメソッド

class RxHelloWorld1

{
static void Main(string[] args)
{
var myObservable = new MyObservable();
myObservable.Subscribe(new MyObserver());

myObservable.Notify("Hello");
myObservable.Notify("World");
}
}



実行結果

OnNext : Hello

OnNext : World

こんな感じでRxのHello worldを実装することができました。

ただ、わざわざインターフェースを実装したり、非常にまどろっこしいです。


良い感じにHello worldを書き換えてみる

なので、Rxで提供されている便利なSubject<T>クラスを利用して、上記のHello worldを書き直してみたいと思います。

Subject<T>は、IObservable<T>とIObserver<T>の両方を実装した、言わば一人二役なクラスです。 Subscribeメソッドも実装してあるし、OnNextメソッドを呼ぶことで変更を通知することができます。


Mainメソッド

class RxHelloWorld2

{
static void Main(string[] args)
{
// IObservable<T>
var subject = new Subject<string>();

// IObserverをわざわざ定義しなくてもラムダ式でOK
var d = subject.Subscribe(str => Console.WriteLine(str));

// subject(Observable)から変更通知
subject.OnNext("Hello");
subject.OnNext("World");

// Subscribeで返されるIDisposableのDisposeを呼ぶと、Observableへの登録を解除できる
d.Dispose();

// OnNextをしてもWriteLineされない
subject.OnNext("Rx完全に理解した");
}
}



実行結果

Hello

World

おや、"Rx完全に理解した"がコンソールに出力されていませんね。実はIObservableが実装するSubscribeメソッドが返すIDisposableのDisposeを呼ぶと、Subscribeで追加したものを削除する効果があります。

このように、非常に簡単にObserverパターンが作れるようになりました。


フィルタリングする : Where

LINQでお馴染みのWhereメソッドを使ってフィルタリングをかけることができます。

class Filtering

{
static void Main(string[] args)
{
var subject = new Subject<int>();

// subjectから発行された値が3の場合のみ、SubscribeしているIObserver<T>に通知する
subject.Where(i => i == 3)
.Subscribe(i => Console.WriteLine("OnNext : {0}", i));

subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
}
}


実行結果

OnNext : 3



値を変換する : Select

LINQでお馴染みのSelectメソッドを使って値を変換することができます。

class Convert

{
static void Main(string[] args)
{
// ObservableのRangeメソッド
// 第一引数に指定した値から1ずつ増やした値を第二引数で指定した数だけ返す
Observable.Range(1, 3)
.Select(i => i * i) // 値を2乗する
.Subscribe(
i => Console.WriteLine("OnNext : {0}", i),
() => Console.WriteLine("Completed"));
}
}


実行結果

OnNext : 1

OnNext : 4
OnNext : 9
Completed


Rxで提供されている機能

RxはIObservable<T>IObserver<T>をベースに下記のような機能を提供しています。

1. IObservable<T>のファクトリメソッド

RxにはIObservable<T>を返すファクトリメソッドが多数用意されています。.NETの標準のイベントからIObservable<T>を生成するメソッドや、非同期呼び出し、タイマー、シーケンス、特定のルールで生成される値の集合etc...さまざまなものが提供されています。

2. IObservable<T>の拡張メソッド

IObservable<T>とIObserver<T>だけではイベントの発行と購読の関係にしかなりません。Rxでは、ここにLINQスタイルの拡張メソッドを提供することでIObservable<T>から発行された値をフィルタリングしたり、発行された値を元に別の処理を行ったり、発行された値の変換を行ったりすることが出来ます。

3. IObserver<T>生成へのショートカット

IObserver<T>を実装しなくても、ラムダ式からIObserver<T>を内部的に生成してくれます。そのため、実際にRxを使用するときには、IObserver<T>インターフェースを実装するケースはほとんどありません。

4. 柔軟なスレッドの切り替え機能

IObservable<T>から発行された値に対する処理を何処のスレッドで実行するのか柔軟に切り替える機能が提供されています。このためバックグラウンドで時間のかかる処理を行い、UIスレッドに切り替えて画面の更新を行うといった処理が簡単に行えるようになります。


まとめ

Rxについてまとめるのは正直無理ゲーです。まじでわからん。笑

Rxなら様々な処理をよしなに書くことができます。

イベント / 並列 / 非同期 / 時間 / 合成 / 分配 / キャンセル / 例外 / 連携 etc... なんでもござれ。

もっとRxについて勉強したい方は下記リンクからどうぞ!(激つよな方々が解説されています)


とても参考になるサイト