C#
Unity
UniRx

UniRx入門 その3 -ストリームソースの作り方

UniRx入門シリーズ 目次はこちら


0.前回のおさらい

前回、OnNext,OnError,OnCompledおよび、IDisposableの用途について説明し、ストリームの寿命管理の方法についても解説しました。
今回は「ストリームソースの作り方」についてざっくりと紹介したいと思います。

1.ストリームのソース(メッセージの発行元)とは

UniRxにおけるストリームとは、次の3つで構成されています。

  1. メッセージを発行するソースとなるもの(Subjectなど)
  2. メッセージを伝播するオペレータ(Where,Selectなど)
  3. メッセージを購読するもの(Subscribe)

特にUniRxを使い始めて間もない人は、「1」のストリームソースを用意する方法がわからず途方に暮れてしまうパターンがあるかと思います。今回はこのストリームの発端となるストリームソースの作り方を紹介したいと思います。

2.ストリームソースとなりうるもの一覧

ストリームソースを用意する方法はいくつか存在します。UniRxが用意してくれているストリームソースを利用してもよいですし、もちろん自分でストリームソースを作ることもできます。

UniRxを利用する場合、次のような方法でストリームソースを用意することができます。

  • Subjectシリーズを使う
  • ReactivePropertyシリーズを使う
  • ファクトリメソッドシリーズを使う
  • UniRx.Triggersシリーズを使う
  • コルーチンを変換して使う
  • uGUIイベントを変換して使う
  • その他UniRxが用意しているものを使う

それぞれ順番に解説していきましょう。

Subjectシリーズ

第1回から何度も登場しているSubjectですが、これを利用するパターンが最も基本形となります。
自分でストリームを作って自由にイベントを発行したいと思った時はこのSubjectを使ってひとまず問題はないでしょう。

そしてこのSubjectですが、派生がいくつか存在しそれぞれ異なる挙動をとります。用途に合わせて適切なものを利用するとよいでしょう。今回は表として紹介するにとどめ、各Subjectの詳細な説明は次回以降に行いたいと思います。

Subject 機能
Subject<T> 最もシンプルなもの。OnNextが実行されたら値を発行する。
BehaviorSubject<T> 最後に発行された値をキャシュし、後からSubscribeされたときにその値を発行してくれる。初期値を設定することもできる
ReplaySubject<T> 過去全ての発行された値をキャッシュし、後からSubscribeされたときにその値を全てまとめて発行する
AsyncSubject<T> OnNextを直ちに発行せずに内部にキャッシュし、OnCompletedが実行されたタイミングで最後のOnNextを1つだけ発行する。FuturePromiseみたいなもの。

AsyncSubjectはまんまFuturePromiseみたいなものです。非同期で処理を走らせて結果が終わったころに取り出したいという時に利用できます。

ReactivePropertyシリーズ

ReactiveProperty<T>

ReactiveProperty<T> は普通の変数にSubjectの機能をくっつけたものです。(実装もそんな感じになってます)
変数の感覚で定義して利用できるのでわかりやすく、初心者にオススメです。

ReactiveProperty
//int型のReactiveProperty
var rp = new ReactiveProperty<int>(10); //初期値を指定可能

//普通に代入したり、値を読み取ることができる
rp.Value = 20;
var currentValue = rp.Value; //20

//Subscribeもできる(Subscribe時に現在の値も発行される)
rp.Subscribe(x => Debug.Log(x));

//値を書き換えた時にOnNextが飛ぶ
rp.Value = 30;
実行結果
20
30

また、ReactivePropertyはスペクタービューに表示して利用することも可能です。この場合はジェネリクス版ではなく、それぞれの型の専用のReactivePropertyを使う必要があります。

また、ReactivePropertyは次回以降に解説する予定のMV(R)Pパターンでその真価を発揮することになります。それまでにしっかりマスターしておいて下さい!

インスペクタービューに表示できるReactivePropertyの例
public class TestReactiveProperty : MonoBehaviour
{
    //int型のReactiveProperty(インスペクタービューに出る版)
    [SerializeField]
    private IntReactiveProperty playerHealth = new IntReactiveProperty(100);

    void Start()
    {
        playerHealth.Subscribe(x => Debug.Log(x));
    }
}

intrp.png

なお、enumもReactiveProperty化してインスペクタービューに表示することも可能ですが、こちらは一工夫必要です。こちらについてはUniRxの作者のneueccさんがブログの方で解説していますのでそちらを参考にされるとよいでしょう。
UniRx 4.8 - 軽量イベントフックとuGUI連携によるデータバインディング

ReactiveCollection

ReactiveCollection<T>はReactivePropertyと同じようなものであり、状態の変化を通知する機能が内蔵されたList<T>です

ReactiveCollectionは普通のListと同じように使うことができるうえ、状態の変化をSubscribeすることができるようになっています。用意されているイベントは次の通りです。

  • 要素の追加
  • 要素の削除
  • 要素数の変化
  • 要素の上書き
  • 要素の移動
  • リストのクリア
ReactiveCollectionの利用例/追加と削除を監視する
var collection = new ReactiveCollection<string>();

collection
    .ObserveAdd()
    .Subscribe(x =>
    {
        Debug.Log(string.Format("Add [{0}] = {1}", x.Index, x.Value));
    });

collection
    .ObserveRemove()
    .Subscribe(x =>
    {
        Debug.Log(string.Format("Remove [{0}] = {1}", x.Index, x.Value));
    });

collection.Add("Apple");
collection.Add("Baseball");
collection.Add("Cherry");
collection.Remove("Apple");
実行結果
Add [0] = Apple
Add [1] = Baseball
Add [2] = Cherry
Remove [0] = Apple

ReactiveDictionary<T1,T2>

ReactiveDictionaryのDictionary版です。ReactiveCollectionとほとんど挙動が同じなので省略します。

ファクトリメソッドシリーズ

ファクトリメソッドとは、UniRxが提供しているストリームソース構築メソッド群のことです。
Subjectだけでは表現できないような複雑なストリームを簡単につくることができる場合があります。UnityでUniRxを利用する場合はファクトリメソッドを利用する機会はそんなにないかもしれませんが、どこかで役に立つことがあると思うので覚えておいてもいいでしょう。

ただしファクトリメソッドは数が多いので、利用頻度が高いものを抜粋して紹介したいと思います。

もし全てのファクトメソッドを知りたい場合、ReactiveXのOperatorsの項目を参考にされるとよいでしょう。

ReactiveX Creating Observablesの項目

Observable.Create

Observable.Create<T> は自由に値を発行するストリームをつくることができるファクトリメソッドです。例えば、一定の手続きで処理の呼び出しルールをこのファクトリメソッド内部に隠蔽してしまい、結果だけをストリームで取り出すといった使い方ができます。

Observable.Createは引数にFunc<IObserver<T>, IDisposable> (IObserver<T>を受け取ってIDisposableを返す関数) を引数にとります。実際に使い方をみてもらった方がわかりやすいと思います。

Observable.Createの例
//0から100まで10刻みで値を発行するストリーム
Observable.Create<int>(observer =>
{
    Debug.Log("Start");

    for (var i = 0; i <= 100; i += 10)
    {
        observer.OnNext(i);
    }

    Debug.Log("Finished");
    observer.OnCompleted();
    return Disposable.Create(() =>
    {
        //終了時の処理
        Debug.Log("Dispose");
    });
}).Subscribe(x => Debug.Log(x));
実行結果
Start
0
10
20
30
40
50
60
70
80
90
100
Finished
Disposable

Observable.Start

Observable.Start は与えられたブロックを別スレッドで実行し結果を1つだけ発行するファクトリメソッドです。非同期で何か処理を走らせ、結果が出たら通知してほしい時に利用することができます。

Observable.Start
//与えられたブロック内部を別スレッドで実行する
Observable.Start(() =>
{
    //GoogleのTOPページをHTTPでGETする
    var req = (HttpWebRequest)WebRequest.Create("https://google.com");
    var res = (HttpWebResponse)req.GetResponse();
    using (var reader = new StreamReader(res.GetResponseStream()))
    {
        return reader.ReadToEnd();
    }
})
.ObserveOnMainThread() //メッセージを別スレッドからUnityメインスレッドに切り替える
.Subscribe(x => Debug.Log(x));

1つ注意しなくてはいけない点があります。Observable.Startは処理を別スレッドで実行しそのスレッドからそのままSubscribe内の関数を実行します。 これはスレッドセーフではないUnityにおいて問題を引き起こすことがあり注意する必要があります。
もしメッセージを別スレッドからメインスレッドに切り替えたい場合、ObserveOnMainThreadというオペレータを利用しましょう。このオペレータを挟むことで、このオペレータ以降がUnityのメインスレッドで実行されるように変換されます。

Observable.Timer/TimerFrame

Observable.Timer は一定時間後にメッセージを発行するシンプルなファクトリメソッドです。
実時間で指定する場合はTimerを、Unityのフレーム数で指定する場合はTimerFrameを利用しましょう。

Timer TimerFrameは引数によって挙動が変化します。1個しか指定しない場合はOneShotな動作で終了し、2個指定した場合は定期的にメッセージを発行する挙動になります。また、スケジューラを指定することで実行するスレッドを指定することも可能です。

また、似たファクトリメソッドしてObservable.Interval/IntervalFrameが存在します。こちらはTimer/TimerFrame の2個引数を指定する場合の省略版みたいなものとなっています。Interva/IntervalFrameではタイマを起動するまでの時間(第一引数)を指定することができなくなっています。

Timerの例
//5秒後に1回だけメッセージを発行して終了
Observable.Timer(System.TimeSpan.FromSeconds(5))
    .Subscribe(_ => Debug.Log("5秒経過しました"));

//5秒後から1秒おきにメッセージを発行する
//自分で停止させない限りずっと動き続ける
Observable.Timer(System.TimeSpan.FromSeconds(5), System.TimeSpan.FromSeconds(1))
    .Subscribe(_ => Debug.Log("一定間隔で実行されています"))
    .AddTo(gameObject);

Timer TimerFrameを定期実行に利用する場合、Disposeのし忘れに十分注意する必要があります。停止するのを忘れて放置しているとメモリリークやNullReferenceExceptionの原因となります。

UniRx.Triggersシリーズ

UniRx.Triggersは、using UniRx.Triggers;を行うことで利用可能になるストリームソースです。UnityのコールバックイベントをUniRxのIObservableに変換して提供してくれるます。UniRxではこれが一番重要かつ便利だと思います。

Triggersは数が多いためとても紹介しきれませんので、GitHubのwikiを参考にしてください。
GitHub - UniRx.Triggers

Unityが提供するほとんどのコールバックイベントをストリームとして取得可能になっており、またGameObjectがDestroyされた時にOnCompletedを自動で発行してくれるしくみになっているため、寿命管理も心配ありません。

Triggersの利用例
using UniRx;
using UniRx.Triggers; //これが必須
using UnityEngine;

/// <summary>
/// WarpZone(という名のIsTriggerなColliderがついた領域)に
/// 侵入した時に浮遊するスクリプト(適当)
/// </summary>
public class TriggersSample : MonoBehaviour
{
    private void Start()
    {
        var isForceEnabled = true;
        var rigidBody = GetComponent<Rigidbody>();

        //フラグが有効な間、上向きに力を加える
        this.FixedUpdateAsObservable()
            .Where(_ => isForceEnabled)
            .Subscribe(_ => rigidBody.AddForce(Vector3.up));

        //WarpZoneに侵入したらフラグを有効にする
        this.OnTriggerEnterAsObservable()
            .Where(x => x.gameObject.tag == "WarpZone")
            .Subscribe(_ => isForceEnabled = true);

        //WarpZoneから出たらフラグを無効にする
        this.OnTriggerExitAsObservable()
            .Where(x => x.gameObject.tag == "WarpZone")
            .Subscribe(_ => isForceEnabled = false);
    }
}

Triggersを使ってUnityのコールバックをストリームにしてしまうと、全てをAwake/Start内にまとめて記述することが可能になります。このあたりのメリットは次回以降に詳しく解説したいと思います。

コルーチンから変換する

実はUnityのコルーチンとUniRxはとても相性がよく、IObservableとコルーチンは相互に変換して利用することが可能になっています。

コルーチンからIObservableに変換するにはObservable.FromCoroutineを利用することで実現することができます。オペレータチェーンでゴリ押しで複雑なストリームを構築するより、コルーチンを併用して手続き的に書いた場合の方がシンプルでわかりやすく書ける場合も存在します。 コルーチンは悪と決めつけず、むしろコルーチンとUniRxと併用したほうが便利であるということを覚えておいて下さい。

UniRxとコルーチンを組み合わせる解説は次回以降に詳しく行いたいと思うので、今回は簡単なサンプルだけ紹介しておくだけにとどめます。

コルーチンからストリームを作る
public class Timer : MonoBehaviour
{
    /// <summary>
    /// 一時停止フラグ
    /// </summary>
    public bool IsPaused { get; private set; }

    void Start()
    {
        //60秒カウントするストリームをコルーチンから作る
        Observable.FromCoroutine<int>(observer => GameTimerCoroutine(observer, 60))
            .Subscribe(t => Debug.Log(t));
    }

    /// <summary>
    /// 初期値から0までカウントするコルーチン
    /// ただしIsPausedフラグが有効な場合はカウントを一時停止する
    /// </summary>
    private IEnumerator GameTimerCoroutine(IObserver<int> observer, int initialCount)
    {
        var current = initialCount;
        while (current > 0)
        {
            if (!IsPaused)
            {
                observer.OnNext(current--);
            }
            yield return new WaitForSeconds(1);
        }
        observer.OnNext(0);
        observer.OnCompleted();
    }
}

uGUIイベントから変換する

UniRxはuGUIとも相性がよく、前述のReactivePropertyと組み合わせることでViewとModelの関係をものすごくキレイに記述することが可能になります(MV(R)Pパターンと呼んでいます)。
今回はMV(R)Pパターンの説明はせず、uGUIイベントから変換する方法だけを紹介します。

と言っても、細かく紹介するところは無く、UniRxをusingしていればuGUIコンポーネントのuGUIイベントとしてそのまま取得することができるようになっています。

uGUIからストリームに変換する例
//uGUIのデフォルトのUnityイベントの名前をしたObservableが用意されている
var button = GetComponent<Button>();
button.OnClickAsObservable().Subscribe();

var inputField = GetComponent<InputField>();
inputField.OnValueChangedAsObservable().Subscribe();
inputField.OnEndEditAsObservable().Subscribe();

var slider = GetComponent<Slider>();
slider.OnValueChangedAsObservable().Subscribe();

//-----------

//なお、このような記述もある
inputField.onValueChanged.AsObservable().Subscribe();

//この2つの記法の違いは、Subscribe時に現在の値を初期値を発行するかどうかである
//Subscribe時に初期値が必要である場合は前者を使うとよい
inputField.OnValueChangedAsObservable(); //初期値あり
inputField.onValueChanged.AsObservable();//初期値なし

その他

UniRxはこれ以外にも便利なストリームソースを用意してくれています。そのうちのいくつかを紹介します。

ObservableWWW

ObservableWWWはUnityのWWWをストリームとして扱えるようにラップしてくれたものです。呼び出すことでUniRxがコルーチンを実行してWWWを処理し、結果だけ通知してくれます。

ObservableWWW
//https://google.comをHTTP GETする
ObservableWWW.Get("https://google.com")
    .Subscribe(x => Debug.Log(x));

(UniRxは内部にコルーチンを持っています。その実体となるGameObjectはMainThreadDispatcherという名前でシーン上に存在しています。このMainThreadDispatcherを止めてしまうとUniRxが正しく動作しなくなるのでこのGameObjectに手を加えるのは避けるのが賢明です。)

Observable.NextFrame

名前の通り、次のフレームでメッセージを発行してくれるストリームを作ることができます。メッセージの発行のタイミングはUpdateではなくコルーチンのタイミングとなるので、実行タイミングがシビアな場合は注意が必要です。
参考:Unityドキュメント イベント関数の実行順

Observable.NextFrame
Observable.NextFrame()
    .Subscribe(_ => Debug.Log("次のフレームで実行されます"));

Observable.EveryUpdate

Observable.EveryUpdate は毎Updateのタイミングを通知してくれるストリームソースです。
UniRx.TriggersのUpdateAsObservableと似ていますがあちらはGameObjectに紐付いておりDestory時にOnCompletedが発行されるのに対し、Observable.EveryUpdate自分で停止しない限りシーンをまたいでも動き続けるストリームとなります。FPSカウンタのような、どのシーンでもずっと使い続けるようなストリームを構築する時に使うといいでしょう。
参考 UniRxでFPSカウンタを作ってみる

ObserveEveryValueChanged

ObserveEveryValueChangedはストリームソースの中でも異色な存在であり、classそのもの(?)の拡張メソッドとして定義されています。。機能としては、任意のオブジェクトのパラーメタを毎フレーム監視して変化があった時に通知するストリームを作成することができます。

ObserveEveryValueChanged
var charcterController = GetComponent<CharacterController>();

//CharacterControllerのIsGroundedを監視
//false → trueになったらログに出す
charcterController
    .ObserveEveryValueChanged(c => c.isGrounded)
    .Where(x => x)
    .Subscribe(_ => Debug.Log("着地!"))
    .AddTo(gameObject);

// ↑のコードは↓とほぼ同義
Observable.EveryUpdate()
    .Select(_=>charcterController.isGrounded)
    .DistinctUntilChanged()
    .Where(x=>x)
    .Subscribe(_ => Debug.Log("着地!"))
    .AddTo(gameObject);

// ObserveEveryValueChangedは
// EveryUpdate + Select + DistinctUntilChanged
// の省略記法と思ってよい

3.まとめ

ストリームの根源(ソース)を作る方法は複数ある

  • Subjectシリーズを使う
  • ReactivePropertyシリーズを使う
  • ファクトリメソッドシリーズを使う
  • UniRx.Triggersシリーズを使う
  • コルーチンを変換して使う
  • uGUIイベントを変換して使う
  • その他UniRxが用意しているものを使う

「UniRx.Triggers」「ReactiveProperty」「uGUIから変換」が個人的には最優先で覚えるべきものだと思います。この3つさえ覚えていればとりあえずUniRxを使った開発で8割くらいはなんとかなると思います。