UniRx入門 その1

  • 191
    いいね
  • 2
    コメント

UniRx入門シリーズ 目次はこちら


まえがき

私は過去にUniRxについての記事を散発的に書いていましたが、体系に学習できる形になっていませんでした。
そこで1から読んでいけば体系的にUniRxを学習できる資料を作ろうと思い、本記事を書くことにしました。

(余裕があるときに書くことになるので、不定期連載になるかと思いますがよろしくお願いします)

0.はじめに「UniRxとは?」

UniRxとは、neueccさんが作成されているReactive Extensions for Unityなライブラリです。

Reactive Extensions(以下Rx)は、要点だけ箇条書きすると次のようなライブラリとなっています。

  • MicrosoftResearchが開発していたC#向け非同期処理向けライブラリ
  • デザインパターンの1つ、Observerパターンをベースに設計されている
  • 時間に関係した処理や、実行タイミングが重要となる処理を簡単に記述できるようになっている
  • 完成度が高く、Java,JavaScript,Swiftなど様々な言語に移植されている

UniRxはこのRxをベースにUnityへ移植されたライブラリであり、本家.NET Rxと比べ以下のような違いがあります。

  • UnityのC#に最適化されて作成されている
  • Unity開発で便利な機能やオペレータが追加実装されている
  • ReactiveProperty等が追加されている
  • パフォーマンスチューニングが徹底されており、元の.NET Rxよりもメモリパフォーマンスが良い

初めてこの記事を読む方にとって上記説明ではおそらくよくわからないかと思いますので、ここは流して具体的な解説に入って行きたいと思います。

1.「eventとUniRx」

event

みなさんはC#の標準機能の」1つ、「event」を利用されたことはありますでしょうか?
eventは何かのタイミングでメッセージを通知し、別の場所に書いた処理を実行させることができる機能です。

event例(eventを発行する側)
using System.Collections;
using UnityEngine;

/// <summary>
/// 100からカウントダウンし値を通知するサンプル
/// </summary>
public class TimeCounter : MonoBehaviour
{
    /// <summary>
    /// イベントハンドラ(イベントメッセージの型定義)
    /// </summary>
    public delegate void TimerEventHandler(int time);

    /// <summary>
    /// イベント
    /// </summary>
    public event TimerEventHandler OnTimeChanged;


    void Start()
    {
        //タイマ起動
        StartCoroutine(TimerCoroutine());
    }

    IEnumerator TimerCoroutine()
    {
        //100からカウントダウン
        var time = 100;
        while (time > 0)
        {
            time--;
            //イベント通知
            OnTimeChanged(time);

            //1秒待つ
            yield return new WaitForSeconds(1);
        }
    }
}
event例(eventを受ける側)
using UnityEngine;
using UnityEngine.UI;

public class TimerView : MonoBehaviour
{
    //それぞれインスタンスはインスペクタビューから設定

    [SerializeField] private TimeCounter timeCounter; 
    [SerializeField] private Text counterText; //uGUIのText

    void Start()
    {
        //タイマのカウンタが変化したイベントを受けてuGUI Textを更新する
        timeCounter.OnTimeChanged += time => // =>は「ラムダ式」と呼ばれる匿名関数の記法
        {
            //現在のタイマ値をUIに反映する
            counterText.text = time.ToString();
        };
    }
}

上記のコードはタイマをカウントダウンしてその数値をイベントとして通知、イベントを受ける側でuGUI Textを更新するサンプルコードとなっています。

UniRx

なぜ突然eventの話をしたかというと、UniRxはeventの完全上位互換であり、eventに比べてより柔軟な記述ができる」からです。

先ほどのコードはUniRxを利用することで次のように書き換えることができます。

イベント発行側
using System.Collections;
using UniRx;
using UnityEngine;

/// <summary>
/// 100からカウントダウンし、Debug.Logにその値を表示するサンプル
/// </summary>
public class TimeCounter : MonoBehaviour
{
    //イベントを発行する核となるインスタンス
    private Subject<int> timerSubject = new Subject<int>();

    //イベントの購読側だけを公開
    public IObservable<int> OnTimeChanged
    {
        get { return timerSubject; }
    } 

    void Start()
    {
        StartCoroutine(TimerCoroutine());
    }

    IEnumerator TimerCoroutine()
    {
        //100からカウントダウン
        var time = 100;
        while (time > 0)
        {
            time--;

            //イベントを発行
            timerSubject.OnNext(time);

            //1秒待つ
            yield return new WaitForSeconds(1);
        }
    }
}
イベント購読側
using UnityEngine;
using UnityEngine.UI;
using UniRx;

public class TimerView : MonoBehaviour
{
    //それぞれインスタンスはインスペクタビューから設定

    [SerializeField] private TimeCounter timeCounter;
    [SerializeField] private Text counterText; //uGUIのText

    void Start()
    {
        //タイマのカウンタが変化したイベントを受けてuGUI Textを更新する
        timeCounter.OnTimeChanged.Subscribe(time =>
        {
            //現在のタイマ値をUIに反映する
            counterText.text = time.ToString();
        });
    }
}

上記が先ほどのeventで実装していたコードをUniRxで置き換えたものとなります。

eventの代わりにSubjectというクラスが、イベントハンドラの登録時にSubscribeというメソッドが登場しています。つまり、UniRxとは
Subjectがイベント機構の中核となり、Subjectに値を渡す(OnNext)と、Subjectの購読(Subscribe)者にメッセージ渡すことができるシステムとなっているということがわかるかと思います。

では次のセクションで、「OnNextとSubscribe」について詳しく説明したいと思います。

2.「OnNextとSubscribe」

OnNextとSubscribeはともに「Subjectに実装されたメソッド」であり、それぞれ以下の様な挙動となっています。

  • Subscribe : メッセージの受け取り時に実行するを関数を登録する
  • OnNext : Subscribeで登録された関数にメッセージを渡して実行する

サンプルとして、次のコードをご覧ください。

OnNextとSubscribe
//Subject作成
Subject<string> subject = new Subject<string>();

//3回Subscribe
subject.Subscribe(msg => Debug.Log("Subscribe1:" + msg));
subject.Subscribe(msg => Debug.Log("Subscribe2:" + msg));
subject.Subscribe(msg => Debug.Log("Subscribe3:" + msg));

//イベントメッセージ発行
subject.OnNext("こんにちは");
subject.OnNext("おはよう");
実行結果
Subscribe1:こんにちは
Subscribe2:こんにちは
Subscribe3:こんにちは
Subscribe1:おはよう
Subscribe2:おはよう
Subscribe3:おはよう

このように、「Subscribeとはメッセージの受け取り時の処理を登録する処理」「OnNextはメッセージを発行してSubscribeで登録された処理を順番に呼び出していく処理」であることがわかるかと思います。

1.png
2.png
3.png

今回は具体的にSubjectの挙動に焦点を当てて、UniRxの基本的な動作の仕組みと考え方を説明しました。
では、次のセクションで、より抽象度を挙げた話、「IObserverインターフェースIObservableインターフェース」の説明をしたいと思います。

4.「IObserverとIObservable」

先ほど、Subjectに「OnNext」と「Subscribe」の2つのメソッドが実装されていると説明しました。
これは実は端折った説明であって、正確な説明ではありません。

より正しい説明をすると「SubjectはIObserverインターフェースとIObservableインターフェースの2つを実装している」という表現となります。
では、このIObserverインターフェースとIObservableインターフェースとは一体何なのか?詳しく説明したいと思います。

IObserverインターフェース

IObserverインターフェース(以降IObserver)とは、Rxにおける「イベントメッセージを発行できる」というふるまいを定義したインターフェースとなっています。定義は次のようになっています。

IObserver
using System;

namespace UniRx
{
    public interface IObserver<T>
    {
        void OnCompleted();
        void OnError(Exception error);
        void OnNext(T value);
    }
}

※ソースコードはこちらから引用

IObserverは見ての通り、「OnNext」「OnError」「OnCompleted」の3つのメソッド定義のみを持つすごくシンプルなインターフェースとなっています。先ほどまで利用していたOnNextメソッドは、実はこのIObserverで定義されていたメソッドでした。

OnErrorは「発生したエラー(Exception)を通知するメッセージを発行するメソッド」であり、OnCompletedは「メッセージの発行が完了したことを通知するメソッド」となっています。
OnError、OnCompletedについては次回以降に掘り下げたいと思いますので、今回はとりあえず「OnNext」「OnError」「OnCompleted」の3つのメソッド(メッセージ)が用意されているといことを覚えて下さい。

IObservableインターフェース

IObservableインターフェース(以降IObservable)とは、Rxにおける「イベントメッセージを購読できる」というふるまいを定義したインターフェースとなっています。

IObservable
using System;

namespace UniRx
{
    public interface IObservable<T>
    {
        IDisposable Subscribe(IObserver<T> observer);
    }
}

※ソースコードはこちらから引用

こちらはもっとシンプルで、Subscribeメソッドがただ1つ定義されているだけです。
つまり、先ほど呼び出していたSubscribeは元を辿ればこのIObservableに定義されたSubscribeを実行してたということを覚えておいて下さい。

(補足) Subscribeの省略呼び出し

さて、ここで先程のSubjectを利用した時のコードを見てみましょう。

シンプルなSubscribe
subject.Subscribe(msg => Debug.Log("Subscribe1:" + msg));

ここで違和感を覚えた方はいますでしょうか?そう、IObservableで定義されたSubscribeは引数にIObserverを取るのに、このSubscribeの書き方では引数に無名関数を受けています。どうしてこのようなことが可能になっているかというと、OnNext,OnError,OnCompletedの3つのメッセージのうち、必要なメッセージのみを利用できるSubscribeの省略呼び出し用の拡張メソッドがIObservableに用意されているからです。(拡張メソッドの実際の定義はこちらを参照

つまり、先ほどの
subject.Subscribe(msg => Debug.Log("Subscribe1:" + msg));
はSubscribeの複数ある省略記法のうちの1つ、「OnNextのみを利用する場合の省略記法」を利用していただけにすぎないのです。
実際にUniRxを利用する際はこの省略記法を使う場合が殆どであり、Subscribe(IObserver<T> observer)を呼び出すことは殆どありません。

Subscribeの省略記法例
//OnNextのみ
subject.Subscribe(msg => Debug.Log("Subscribe1:" + msg));

//OnNext & OnError
subject.Subscribe(
    msg => Debug.Log("Subscribe1:" + msg),
    error => Debug.LogError("Error" + error));

//OnNext & OnCompleted
subject.Subscribe(
    msg => Debug.Log("Subscribe1:" + msg),
    () => Debug.Log("Completed"));

//OnNext & OnError & OnCompleted
subject.Subscribe(
    msg => Debug.Log("Subscribe1:" + msg),
    error => Debug.LogError("Error" + error),
    () => Debug.Log("Completed"));

Subjectの定義を確認

ここで、さっきまで使っていたSubjectクラスの定義を見てみましょう。

Subjectのクラス定義の冒頭
namespace UniRx
{
    public sealed class Subject<T> : ISubject<T>, IDisposable, IOptimizedObservable<T> {/*実装は省略*/}
}

Subjectはいくつかインターフェースを実装しているようですが、このうちの1つISubject<T>の定義も見てみましょう。

ISubject
namespace UniRx
{
    public interface ISubject<TSource, TResult> : IObserver<TSource>, IObservable<TResult>
    {
    }

    public interface ISubject<T> : ISubject<T, T>, IObserver<T>, IObservable<T>
    {
    }
}

SubjectはIObservableとIObserverの2つを実装している、つまり、Subjectは「値を発行する」「値を購読できる」という2つの機能を持ったクラスであるということが定義からも確認することができました。

まとめ

2.png

5.「オペレータ」

さて、先ほど「SubscribeはIObservableに実装されているものを(直接/間接的に)呼び出している」という説明をしました。
実はこれ、言い換えると「IObservableであれば何でもSubscribeすることができる」という意味になります。つまり、相手がSubjectかどうかなんて関係なく、IObservableでさえあればSubscribeできちゃうということになります。

この仕組みを応用し、以下の様なことを考えてみましょう。

考察:イベントメッセージをフィルタリングしてみる

例えば、以下の様な実装があったとします

プレイヤが触れたGameObjectのTagを発行する、みたいな例
//文字列を発行するSubject
Subject<string> subject = new Subject<string>();

//文字列をコンソールに表示
subject.Subscribe(x => Debug.Log(string.Format("プレイヤが{0}に衝突しました", x)));

//イベントメッセージ発行
//プレイヤが触れたオブジェクトのTagが発行されている、みたいな想定
subject.OnNext("Enemy");
subject.OnNext("Wall");
subject.OnNext("Wall");
subject.OnNext("Enemy");
subject.OnNext("Enemy");
結果
プレイヤがEnemyに衝突しました
プレイヤがWallに衝突しました
プレイヤがWallに衝突しました
プレイヤがEnemyに衝突しました
プレイヤがEnemyに衝突しました

さて、上記のようなイベントが発行されるSubjectがあったときに「Enemyに触れた時だけ出力したい!」となりました。
この程度のことなら愚直にSubscribe内にif文を書いてもいいのですが、それでは話が進まないので別の方法を考えてみることにしましょう。

SubjectとSubscribeの間に「フィルタする処理」を挟めるのでは?

先ほども言いましたが、値を発行する処理はIObserver、値を購読する処理はIObservableにそれぞれ定義されていました。
ということは、「この2つが実装されたクラスをSubjectとSubscribeのに挟み、そこにフィルタリング処理を書いてしまうことができるのでは?」ということになります。

3.png
4.png

この様な発想のもと、大本のSubjectと末端のSubscribeの間に挟み込んでいろいろメッセージを処理する部品のことをUniRxでは「オペレータ」と呼びます。

フィルタリングするオペレータ「Where」

UniRxには様々なオペレータがたくさん用意されており、殆どの場合は既存のオペレータを組み合わせるだけで済むようになっています。今回はそのうちの1つ、メッセージのフィルタリングを行う「Where」を使ってメッセージがEnemyの場合のみをフィルタリングするようにしてみましょう。

フィルタリングオペレータ「Where」
//文字列を発行するSubject
Subject<string> subject = new Subject<string>();

subject.
  .Where(x => x == "Enemy") //←フィルタリングオペレータ
  .Subscribe(x => Debug.Log(string.Format("プレイヤが{0}に衝突しました", x)));

//イベントメッセージ発行
//プレイヤが触れたオブジェクトのTagが発行されている、みたいな想定
subject.OnNext("Wall");
subject.OnNext("Wall");
subject.OnNext("Enemy");
subject.OnNext("Enemy");
結果
プレイヤがEnemyに衝突しました
プレイヤがEnemyに衝突しました

このように、WhereオペレータをIObservbleとIObserverの間に差し込んであげるだけで、メッセージのフィルタリングができるようになりました。

様々なオペレータ

UniRxには、Where以外にもたくさんのオペレータが用意されています。その一部を紹介すると、

  • フィルタリングする「Where」
  • メッセージ変換する「Select]
  • 重複を排除する「Distinct」
  • 一定個数まとまるまで待つ「Buffer」
  • 短時間にまとめてきた場合に先頭のみを使う「ThrottleFirst」

などが挙げられます。

上記オペレータは本当にたくさんあるオペレータのうちのほんの一部に過ぎません。
UniRxが提供する全オペレータ一覧は別記事にまとめてあるのでそちらのページをご覧ください。

6.「ストリーム」

さて、今まで「SubjectをSubscribeする」や「Subjectにオペレータを挟んでSubscribeする」といった表現を使ってきました。流石に毎度毎度この様な言い方をしているとツライので、これらを表現する単語として「ストリーム」を紹介しておきます。

UniRxにおける「ストリーム」とは、「メッセージが発行されてからSubscribeに到達するまでの一連の処理の流れ」を表現するワードとなっています。
「オペレータを組み合わせてストリームを構築する」「Subscribeしてストリームを稼働させる」「OnCompletedを発行してストリームを停止させる」などの使い方をします。

今後、この「ストリーム」という表現は多用するので覚えておきましょう。
5.png

今回のまとめ

  • UniRxの核はSubjectクラス
  • SubjectをSubscribeするのが基礎の基礎
  • ただし実際に利用するときはIObserverとIObservableインターフェイスのみを意識すれば良い
  • イベントの発行・購読が分離していることを活用し、イベント処理を柔軟にできるようにしたものがオペレータ
  • オペレータを連結して作った一連のイベント処理の流れを「ストリーム」と呼ぶ

次回は今回説明できなかった「OnError」「OnCompleted」「Dispose」あたりの説明をしたいと思います。


おまけ:Whereオペレータを自作してみる

本文中で「IObserverとIObservableが実装されたクラスをSubjectとSubscribeのに挟み、そこにフィルタリング処理を書くことでフィルタができる」と解説し、実際にそれを行ってくれる「Where」オペレータを紹介しました。

でも、本当にそんなことができるのか?実際に同じような挙動をするフィルタリングオペレータを自分で定義してみることにしましょう。

自作のフィルタリングオペレータ本体

自作フィルタリングオペレータ
/// <summary>
/// フィルタリングオペレータ
/// </summary>
public class MyFilter<T> : IObserver<T>, IObservable<T>, IDisposable
{
    private IObserver<T> _observer;
    private bool _isDisposed = false;
    private object lockObject;

    //判定式
    private Func<T, bool> _conditionalFunc;

    public MyFilter(Func<T, bool> conditionalFunc)
    {
        lockObject = new object();
        this._conditionalFunc = conditionalFunc;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        //Subscribeされたら、observerを登録し自身を返却
        _observer = observer;
        return this;
    }

    public void OnNext(T value)
    {
        lock (lockObject)
        {
            if (_isDisposed) return;
            if (_observer == null) return;
            try
            {
                //条件を満たす場合のみOnNextを通過
                if (_conditionalFunc(value))
                {
                    _observer.OnNext(value);
                }
            }
            catch (Exception e)
            {
                //途中でエラーが発生したらエラーを送信
                _observer.OnError(e);
                _isDisposed = true;
                _observer = null;
            }
        }
    }

    public void OnError(Exception error)
    {
        lock (lockObject)
        {
            if (_isDisposed) return;

            //エラーを伝播して停止
            _observer.OnError(error);
            _isDisposed = true;
            _observer = null;
        }
    }

    public void OnCompleted()
    {
        lock (lockObject)
        {
            if (_isDisposed) return;

            //停止
            _observer.OnCompleted();
            _isDisposed = true;
            _observer = null;
        }
    }

    //自身のSubscribeが中止されたら実行される
    public void Dispose()
    {
        lock (lockObject)
        {
            _isDisposed = true;
        }
    }
}

マルチスレッドで利用することを想定したlockが入っていたり、例外発生時の処理が入っていたりしてちょっとごちゃごちゃしていますが、一番重要なのはOnNext中の次の箇所です。

OnNext中の一部
//条件を満たす場合のみOnNextを通過
if (_conditionalFunc(value))
{
    _observer.OnNext(value);
}

SourceとなっているObservableからOnNextが発行されたらそのメッセージ値を評価し、条件を満たしていたら自身のSubscriberにそれを伝えてあげる処理となっています。

使い勝手を考慮して拡張メソッドを作成

このままだとオペレータを使うたびにいちいちインスタンス化してあげないといけなくて使い勝手が悪いので、オペレータチェーンでこのFilterを挟み込めるように拡張メソッドを用意してあげます。

Filterを呼び出す拡張メソッド
public static class ObservableOperators
{
    public static IObservable<T> FilterOperator<T>(this IObservable<T> source, Func<T, bool> conditionalFunc)
    {
        var filter = new MyFilter<T>(conditionalFunc);
        source.Subscribe(filter);
        return filter;
    }
}

自分で作ったオペレータを動かしてみる

これで準備は整ったので、このオペレータを実際に使ってみましょう。

自作のフィルタリングオペレータを使ってみる
//文字列を発行するSubject
Subject<string> subject = new Subject<string>();

//filterを挟んでSubscribeしてみる
subject
    .FilterOperator(x => x == "Enemy")
    .Subscribe(x => Debug.Log(string.Format("プレイヤが{0}に衝突しました", x)));

//イベントメッセージ発行
//プレイヤが触れたオブジェクトのTagが発行されている、みたいな想定
subject.OnNext("Wall");
subject.OnNext("Wall");
subject.OnNext("Enemy");
subject.OnNext("Enemy");
結果
プレイヤがEnemyに衝突しました
プレイヤがEnemyに衝突しました

想定通り、メッセージがフィルタリングされ"Enemy"のみが通過するようになりました。

このように、「IObserverとIObservableの2つを実装したクラスを用意して間に挟んで上げれば、オペレータとして動作してメッセージの処理ができる」ということが理解して頂けたかと思います。

その2