みなさんはReactive Extensions(Rx)
って知っていますか?
使ったことはなくても何となく聞いたことはあると思います。
Unityを使っている人ならUniRx
というUnity向けのRxが存在するのでそちらのほうがなじみが深いかもしれません。
Rxは使いこなせば便利な機能ですがいったいどのように実装されているかがすぐにはわかりません。(個人の感想です)
今回は備忘録を兼ねて超シンプルなRxを実装していきます。
そもそもRxを知らない方は以下の記事が参考になります。
こわくないReactive Extensions超入門
ObserverとObservable
Rxの根幹となるインターフェースとしてIObserver
とIObservable
が存在します。
IObservable
はその名の通り観測対象のオブジェクト、
IObserver
は観測を行うオブジェクトです。
これを超シンプルな形でクラスにしたものが以下のものです。
(インターフェースではなく直接クラスにしています。)
class Observer<T>
{
private Action<T> onNext;
public Observer(Action<T> onNext) => this.onNext = onNext;
public void OnNext(T v) => this.onNext.Invoke(v);
}
class Observable<T>
{
private Action<Observer<T>> subscribe;
public Observable(Action<Observer<T>> subscribe) => this.subscribe = subscribe;
public void Subscribe(Observer<T> observer) => this.subscribe(observer);
}
とてもシンプルですね
Observer
、Observable
のどちらのクラスも空行を除けば実質3行しか存在しません。
実際にこのObserver
とObservable
を使用してみましょう
Observer<string> source = null;
var root = new Observable<string>(o => source = o);
var second = new Observable<string>(o =>
{
root.Subscribe(new Observer<string>(s =>
{
Console.WriteLine($"'{s}' が流れてきました。2倍にします。");
o.OnNext(s + s);
}));
});
var third = new Observable<string>(o =>
{
second.Subscribe(new Observer<string>(s =>
{
if (s == "ふがふが")
{
Console.WriteLine("'ふがふが'は先に流しません");
}
else
{
o.OnNext(s);
}
}));
});
third.Subscribe(new Observer<string>(s =>
{
Console.WriteLine($"値:{s}");
}));
Console.WriteLine("値を送信します");
source.OnNext("てすと");
Console.WriteLine("値を送信します(2回目)");
source.OnNext("ふが");
Console.WriteLine("値を送信します(3回目)");
source.OnNext("ほげほげ")
なにかよくわからないコードになりました
やってることはルートとなるObservable
を作っていくつかObservable
を繋げています。
これを実際に実行すると以下のような結果となります。
うまくいっているような気がします
拡張メソッドで拡張する
前のセクションのサンプルだと何かよくわからない状態になっていますが、Rxの本番はここからです。
このObservable
を拡張メソッドを使って拡張していきましょう。
拡張メソッドについて:拡張メソッド - C# によるプログラミング入門
Subscribe
Observable
から流れてくる値を観測するためにいちいちObserver
を作成するのは面倒です。
ラムダ式を渡せばいいように拡張しましょう。
public static void Subscribe<T>(this Observable<T> observable, Action<T> subscribe) =>
observable.Subscribe(new Observer<T>(v => subscribe(v)));
たったの一行です
これを使うとこうなります。
// before
second.Subscribe(new Observer<string>(s =>
{
Console.WriteLine($"値:{s}");
}));
// after
second.Subscribe(s => Console.WriteLine($"値:{s}"));
Select
Observable
から流れてくる値を加工できるようにしましょう。
public static Observable<U> Select<T, U>(this Observable<T> observable, Func<T, U> select)
{
return new Observable<U>(o =>
{
observable.Subscribe(v =>
{
o.OnNext(select(v));
});
});
}
ジェネリクスとラムダ式が多く出てきて難しくなってきました
とにかくこれを使ってみましょう。
// before
var second = new Observable<string>(o =>
{
root.Subscribe(new Observer<string>(s =>
{
Console.WriteLine($"'{s}' が流れてきました。2倍にします。");
o.OnNext(s + s);
}));
});
// after
var second = root.Select(s =>
{
Console.WriteLine($"'{s}' が流れてきました。2倍にします。");
return s + s;
});
劇的な改善です
Where
Observable
から流れてくる値をフィルタできるようにしましょう。
public static Observable<T> Where<T>(this Observable<T> observable, Func<T, bool> where)
{
return new Observable<T>(o =>
{
observable.Subscribe(v =>
{
if (where(v))
{
o.OnNext(v);
}
});
});
}
これも難しいですがSelect
とほぼ同じです。
同様に使ってみます。
// before
var third = new Observable<string>(o =>
{
second.Subscribe(new Observer<string>(s =>
{
if (s == "ふがふが")
{
Console.WriteLine("'ふがふが'は先に流しません");
}
else
{
o.OnNext(s);
}
}));
});
// after
var third = second.Where(s =>
{
if (s == "ふがふが")
{
Console.WriteLine("'ふがふが'は先に流しません");
return false;
}
return true;
});
最初のサンプルに適用する
最初のサンプルを拡張メソッドバージョンで書き直してみましょう。
Observer<string> source = null;
var root = new Observable<string>(o => source = o);
root
.Select(s =>
{
Console.WriteLine($"'{s}' が流れてきました。2倍にします。");
return s + s;
})
.Where(s =>
{
if (s == "ふがふが")
{
Console.WriteLine("'ふがふが'は先に流しません");
return false;
}
return true;
})
.Subscribe(s => Console.WriteLine($"値:{s}"));
Console.WriteLine("値を送信します");
source.OnNext("てすと");
Console.WriteLine("値を送信します(2回目)");
source.OnNext("ふが");
Console.WriteLine("値を送信します(3回目)");
source.OnNext("ほげほげ");
いつものRxです
さらに余分なConsole.WriteLine
を消してみましょう。
root
.Select(s => s + s)
.Where(s => s != "ふがふが")
.Subscribe(s => Console.WriteLine($"値:{s}"));
(色々表示されなくなりましたが)たったの3行になりました!!!
まとめ
今回実装したものはあまりにもシンプルすぎて本家のRxとはほぼ別物です。
(Subscribe
でOnNext
しか使えないとかIDisposable
ではないとか機能が足りなさすぎるとか...)
それでもコア部分のイメージは大体つかめると思います。
実際に書いて動かしてみるとより分かりやすいです。
今回書いたソースコードは以下にあります。
yaegaki/Simple-Rx