LoginSignup
24
24

More than 5 years have passed since last update.

[C#]超シンプルなObservable/Observerを使ったRx

Last updated at Posted at 2018-08-29

みなさんはReactive Extensions(Rx)って知っていますか?
使ったことはなくても何となく聞いたことはあると思います。
Unityを使っている人ならUniRxというUnity向けのRxが存在するのでそちらのほうがなじみが深いかもしれません。
Rxは使いこなせば便利な機能ですがいったいどのように実装されているかがすぐにはわかりません。(個人の感想です:frowning2:)
今回は備忘録を兼ねて超シンプルなRxを実装していきます。

そもそもRxを知らない方は以下の記事が参考になります。
こわくないReactive Extensions超入門

ObserverとObservable

Rxの根幹となるインターフェースとしてIObserverIObservableが存在します。
IObservableはその名の通り観測対象のオブジェクト:full_moon:IObserverは観測を行うオブジェクト:eye:です。

これを超シンプルな形でクラスにしたものが以下のものです。
(インターフェースではなく直接クラスにしています。)

ObserverとObservable
class Observer<T>
{
    private Action<T> onNext;

    public Observer(Action<T> onNext) => this.onNext = onNext;
    public void OnNext(T v) => this.onNext.Invoke(v);
}

class Observable<T>
{
    private Action<Observer<T>> subscribe;

    public Observable(Action<Observer<T>> subscribe) => this.subscribe = subscribe;
    public void Subscribe(Observer<T> observer) => this.subscribe(observer);
}

とてもシンプルですね:vulcan:
ObserverObservableのどちらのクラスも空行を除けば実質3行しか存在しません。

実際にこのObserverObservableを使用してみましょう:point_up:

シンプルなObserverとObservableを使ってみる。
Observer<string> source = null;
var root = new Observable<string>(o => source = o);

var second = new Observable<string>(o =>
{
    root.Subscribe(new Observer<string>(s =>
    {
        Console.WriteLine($"'{s}' が流れてきました。2倍にします。");
        o.OnNext(s + s);
    }));
});

var third = new Observable<string>(o =>
{
    second.Subscribe(new Observer<string>(s =>
    {
        if (s == "ふがふが")
        {
            Console.WriteLine("'ふがふが'は先に流しません");
        }
        else
        {
            o.OnNext(s);
        }
    }));
});

third.Subscribe(new Observer<string>(s =>
{
    Console.WriteLine($"値:{s}");
}));

Console.WriteLine("値を送信します");
source.OnNext("てすと");
Console.WriteLine("値を送信します(2回目)");
source.OnNext("ふが");
Console.WriteLine("値を送信します(3回目)");
source.OnNext("ほげほげ")

なにかよくわからないコードになりました:ghost:
やってることはルートとなるObservableを作っていくつかObservableを繋げています。
これを実際に実行すると以下のような結果となります。

image.png

うまくいっているような気がします:thinking:

拡張メソッドで拡張する

前のセクションのサンプルだと何かよくわからない状態になっていますが、Rxの本番はここからです。
このObservableを拡張メソッドを使って拡張していきましょう。

拡張メソッドについて:拡張メソッド - C# によるプログラミング入門

Subscribe

Observableから流れてくる値を観測するためにいちいちObserverを作成するのは面倒です。
ラムダ式を渡せばいいように拡張しましょう。

Subscribe
public static void Subscribe<T>(this Observable<T> observable, Action<T> subscribe) =>
    observable.Subscribe(new Observer<T>(v => subscribe(v)));

たったの一行です:clap:

これを使うとこうなります。

// before
second.Subscribe(new Observer<string>(s =>
{
    Console.WriteLine($"値:{s}");
}));

// after
second.Subscribe(s => Console.WriteLine($"値:{s}"));

Select

Observableから流れてくる値を加工:pick:できるようにしましょう。

Select
public static Observable<U> Select<T, U>(this Observable<T> observable, Func<T, U> select)
{
    return new Observable<U>(o =>
    {
        observable.Subscribe(v =>
        {
            o.OnNext(select(v));
        });
    });
}

ジェネリクスとラムダ式が多く出てきて難しくなってきました:thermometer_face:
とにかくこれを使ってみましょう。

// before
var second = new Observable<string>(o =>
{
    root.Subscribe(new Observer<string>(s =>
    {
        Console.WriteLine($"'{s}' が流れてきました。2倍にします。");
        o.OnNext(s + s);
    }));
});

// after
var second = root.Select(s =>
{
    Console.WriteLine($"'{s}' が流れてきました。2倍にします。");
    return s + s;
});

劇的な改善です:star:

Where

Observableから流れてくる値をフィルタ:no_entry:できるようにしましょう。

Where
public static Observable<T> Where<T>(this Observable<T> observable, Func<T, bool> where)
{
    return new Observable<T>(o =>
    {
        observable.Subscribe(v =>
        {
            if (where(v))
            {
                o.OnNext(v);
            }
        });
    });
}

これも難しいですがSelectとほぼ同じです。
同様に使ってみます。

// before
var third = new Observable<string>(o =>
{
    second.Subscribe(new Observer<string>(s =>
    {
        if (s == "ふがふが")
        {
            Console.WriteLine("'ふがふが'は先に流しません");
        }
        else
        {
            o.OnNext(s);
        }
    }));
});

// after
var third = second.Where(s =>
{
    if (s == "ふがふが")
    {
        Console.WriteLine("'ふがふが'は先に流しません");
        return false;
    }

    return true;
});

:heart_eyes::heart_eyes::heart_eyes:

最初のサンプルに適用する

最初のサンプルを拡張メソッドバージョンで書き直してみましょう。

拡張メソッドを使ったサンプル1
Observer<string> source = null;
var root = new Observable<string>(o => source = o);

root
    .Select(s =>
    {
        Console.WriteLine($"'{s}' が流れてきました。2倍にします。");
        return s + s;
    })
    .Where(s =>
    {
        if (s == "ふがふが")
        {
            Console.WriteLine("'ふがふが'は先に流しません");
            return false;
        }

        return true;
    })
    .Subscribe(s => Console.WriteLine($"値:{s}"));


Console.WriteLine("値を送信します");
source.OnNext("てすと");
Console.WriteLine("値を送信します(2回目)");
source.OnNext("ふが");
Console.WriteLine("値を送信します(3回目)");
source.OnNext("ほげほげ");

いつものRxです:v:
さらに余分なConsole.WriteLineを消してみましょう。

拡張メソッドを使ったサンプル2
root
    .Select(s => s + s)
    .Where(s => s != "ふがふが")
    .Subscribe(s => Console.WriteLine($"値:{s}"));

(色々表示されなくなりましたが)たったの3行になりました!!!

まとめ

今回実装したものはあまりにもシンプルすぎて本家のRxとはほぼ別物です。
(SubscribeOnNextしか使えないとかIDisposableではないとか機能が足りなさすぎるとか...)

それでもコア部分のイメージは大体つかめると思います。
実際に書いて動かしてみるとより分かりやすいです。

今回書いたソースコードは以下にあります。
yaegaki/Simple-Rx

24
24
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
24
24