Unity
UniRx

UniRx入門 その2 - メッセージの種類/ストリームの寿命

More than 1 year has passed since last update.

UniRx入門シリーズ 目次はこちら


0.前回のおさらい

前回、IObserverインターフェースは次のような定義であると説明しました。

IObserver
using System;

namespace UniRx
{
    public interface IObserver<T>
    {
        void OnCompleted();
        void OnError(Exception error);
        void OnNext(T value);
    }
}

前回は解説の都合上、OnNextのみを説明しました。
今回は省略してしまった「OnError」「OnCompleted」、および「Dispose」について解説します。

1. 「OnNext」「OnError」「OnCompleted」

UniRxで発行されるメッセージは全てこの3種類のうちのどれかとなり、以下のような用途で利用されています。

  • OnNext : 通常イベントが発行されたときに通知されるメッセージ
  • OnError : ストリームの処理中で例外が発生したことを通知する
  • OnCompleted: ストリームが終了したことを通知する

「OnNext」メッセージ

OnNextはUniRxにおいて最も多用されるメッセージであり、通常「イベントの通知」(EventArgsを含む)を表しています。

最も利用されるメッセージであり、使い方によってはこのOnNextメッセージのみを意識していれば問題ない場合も多いです。

例1. 整数を通知する

var subject = new Subject<int>();

subject.Subscribe(x => Debug.Log(x));
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnCompleted();
結果
1
2
3

例1ではシンプルに整数を通知し、購読側でDebug.Logに表示するだけの極々単純なサンプルです。

例2. 「意味」のない値を通知する

var subject = new Subject<Unit>();

subject.Subscribe(x => Debug.Log(x));

//Unit型はそれ自身は特に意味を持たない
//メッセージの内容に意味はなく、イベント通知のタイミングのみが重要な時に利用できる
subject.OnNext(Unit.Default);
結果
()

例2では Unit型 という特殊な型を発行しています。
この型は「メッセージの中身に意味は無いよ」という表明をするときに利用します。
これは「イベントが発行されたタイミングが重要であって、OnNextメッセージの中身は何でもいい」という場合に利用することができます。

例えば、「シーンの初期化が完了した」や「プレイヤが死亡した」といったときに利用できます。

例3. シーンの初期化完了をUnit型で通知する

public class GameInitializer : MonoBehaviour
{
    //Unit型を利用
    private Subject<Unit> initializedSubject = new Subject<Unit>();

    /// <summary>
    /// ゲームの初期化が完了したことを通知する
    /// </summary>
    public IObservable<Unit> OnInitializedAsync
    {
        get { return initializedSubject; }
    }

    void Start()
    {
        //初期化開始
        StartCoroutine(GameInitializeCoroutine());

        OnInitializedAsync.Subscribe(_ =>
        {
            Debug.Log("初期化完了");
        });

    }

    IEnumerator GameInitializeCoroutine()
    {
        /**
        初期化処理

        WWWで通信したり、オブジェクトをインスタンス化したりといった
        時間がかかる重い処理をここでやる想定

        **/

        //初期化終了を通知する
        initializedSubject.OnNext(Unit.Default); //タイミングが重要な通知なのでUnitで十分
        initializedSubject.OnCompleted();
    }
}

コルーチンでゲームの初期化を行い、処理が完了したらイベントを発行して通知するといったクラスの実装例です。
このような、イベントは飛ばしたいけどイベントの中身の値は何でも良いといったシチュエーションでUnit型を使う場合が多いです。

(なお、例3ではSubjectを利用しましたが、この場合ではAsyncSubjectの方が適切かもしれません。AsyncSubjectについては次回以降解説します。)

「OnError」メッセージ

OnErrorメッセージは名前の通り例外がストリームの途中で発生した時に通知されるメッセージとなっています。
OnErrorメッセージはストリームの途中でcatchして処理したり、そのままSubscribeメソッドに到達させてハンドリングさせることもできます。 もしOnErrorメッセージがSubscribeまで到達した場合、そのストリームの購読は終了し破棄されてしまいます。

例4. 途中で発生した例外をSubscribeで受け取る

var stringSubject = new Subject<string>();

//文字列をストリームの途中で整数に変換する
stringSubject
    .Select(str => int.Parse(str)) //数値を表現する文字列以外の場合は例外が出る
    .Subscribe(
        x => Debug.Log("成功:" + x), //OnNext
        ex => Debug.Log("例外が発生:" + ex) //OnError
    );

stringSubject.OnNext("1");
stringSubject.OnNext("2");
stringSubject.OnNext("Hello"); //このメッセージで例外が出る
stringSubject.OnNext("4");
stringSubject.OnCompleted();
実行結果
成功:1
成功:2
例外が発生:System.FormatException: Input string was not in the correct format

※SubscribeのオーバーロードのうちのSubscribe(OnNext,OnError)を受け取るタイプを利用しています

例4では、OnNextで送られてきた文字列をSelectオペレータ(値の変換)でint型にパースして表示するストリームを使った例です。
この様に、ストリームの途中で例外が発生してしまったときに、OnErrorメッセージが発行されてSubscribeに通知が来ていることがわかるかと思います。

また、OnError受信後のOnNext("4")は処理がされていません。このように、OnErrorをSubscribeが検知するとストリームの購読を中止してしまうということを覚えておいて下さい。

例5.途中で例外が発生したら再購読する

var stringSubject = new Subject<string>();

//文字列をストリームの途中で整数に変換する
stringSubject
    .Select(str => int.Parse(str))
    .OnErrorRetry((FormatException ex) => //例外の型指定でフィルタリング可能
    {
        Debug.Log("例外が発生したため再購読します");
    })
    .Subscribe(
        x => Debug.Log("成功:" + x), //OnNext
        ex => Debug.Log("例外が発生:" + ex) //OnError
    );

stringSubject.OnNext("1");
stringSubject.OnNext("2");
stringSubject.OnNext("Hello");
stringSubject.OnNext("4");
stringSubject.OnNext("5");
stringSubject.OnCompleted();
実行結果
成功:1
成功:2
例外が発生したため再購読します
成功:4
成功:5

例5では、途中で例外が発生した場合は OnErrorRetry でストリームを再構築して購読を続行させています。
OnErrorRetryはOnErrorが特定の例外であった場合に、再度Subscribeからやり直してくれる例外ハンドリング用オペレータとなっています。(ここで言うSubscribeからやり直すというのはSubjectにIObserverを再登録しにいくという意味です)

例外ハンドリング用オペレータ

OnError発生時に例外をハンドリングすることができるオペレータがいくつか用意されています。ストリームの目指したい挙動に応じて使い分けるといいでしょう。

やりたいこと オペレータ 備考
OnErrorが来たら再度Subscribeしたい Retry
OnErrorを受け取りエラー処理を行い、別のストリームに差し替える Catch
OnErrorを受け取りエラー処理をした後、OnErrorを握りつぶしてOnCompletedに差し替える CatchIgnore
OnErrorが来たらエラー処理をした後にSubscribeし直す(時間指定可能) OnErrorRetry

「OnCompleted」メッセージ

OnCompletedは「ストリームが完了したためこれ以降メッセージを発行しない」ということを通知するメッセージです。

もしOnCompletedメッセージがSubscribeまで到達した場合、OnError同様にそのストリームの購読は終了し破棄されます。 この性質を利用し、ストリームにOnCompletedを適切に発行して上げればまとめて購読終了が実行できるため、ストリームの後始末をする場合はこのメッセージを発行するようにしましょう。

なお、一度OnCompletedを発行したSubjectは再利用不可能となります。SubscribeしてもすぐにOnCompletedが返ってくるようになります。

例6. OnCompletedを検知する

var subject = new Subject<int>();
subject.Subscribe(
    x => Debug.Log(x),
    () => Debug.Log("OnCompleted")
);
subject.OnNext(1);
subject.OnNext(2);
subject.OnCompleted();
実行結果
1
2
OnCompleted

※SubscribeのオーバーロードのうちのSubscribe(OnNext,OnCompleted)を受け取るタイプを利用しています

例6のように、SubscribeにOnCompletedを受け取るオーバーロードも用意されており、これを利用することでOnCompletedを検知することができます。

Subscribeのオーバーロード

UniRx入門 その1にてSubscribeには複数のオーバーロードが存在すると説明しました。
実際には以下の組み合わせのオーバーロードが用意されており、利用したいメッセージに合わさえて選択するとよいでしょう。

  • Subscribe(IObserber observer) ------------------------------基本形
  • Subscribe() --------------------------------------------------- 全てのメッセージを無視
  • Subscribe(Action onNext) ----------------------------------- OnNextのみ
  • Subscribe(Action onNext, Action onError) ------------------ OnNext + OnError
  • Subscribe(Action onNext, Action onCompleted) ----------- OnNext + OnCompleted
  • Subscribe(Action onNext, Action onError, Action onCompleted) -- 全部

2. ストリームの購読終了(Dispose)

つづいて、前回端折ったもうIObservableの「IDisposable」の説明をします。

IObservable
public interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

IDisposableはC#に用意されている既定のインターフェースであり、「リソースの開放」を行えるようにするためのメソッド「Dispose()」をただ1つ持つインターフェースです。

Subscribeの返り値がIDisposableということはつまり、 Subscribeメソッドが返すIDisposabelのDisposeを実行すればストリームの購読を終了できるということになります。

例7. Dispose()でストリームの購読終了

var subject = new Subject<int>();

//IDisposeを保存
var disposable = subject.Subscribe(x => Debug.Log(x), () => Debug.Log("OnCompleted"));

subject.OnNext(1);
subject.OnNext(2);

//購読終了
disposable.Dispose();

subject.OnNext(3);
subject.OnCompleted();
実行結果
1
2

例7は購読をDisposeを呼び出すことで途中で中止している例です。

このように、Disposeを呼び出すことで購読を任意のタイミングで停止することができます。
ここで注意してほしいのは、Dispose()を実行して購読中止した場合、OnCompletedが発行されるわけではないという点です。購読中止時の処理をOnCompletedに書いていた場合、Disposeで停止させてしまうと実行されないことになってしまうので注意して下さい。

例8. 特定のストリームのみ購読中止

var subject = new Subject<int>();

//IDisposeを保存
var disposable1 = subject.Subscribe(x => Debug.Log("ストリーム1:" + x), () => Debug.Log("OnCompleted"));
var disposable2 = subject.Subscribe(x => Debug.Log("ストリーム2:" + x), () => Debug.Log("OnCompleted"));
subject.OnNext(1);
subject.OnNext(2);

//ストリーム1だけ購読終了
disposable1.Dispose();

subject.OnNext(3);
subject.OnCompleted();
実行結果
ストリーム1:1
ストリーム2:1
ストリーム1:2
ストリーム2:2
ストリーム2:3
2:OnCompleted

例8は特定のSubscribeのDisposeを呼び出すことでその購読のみを中止させている例です。
OnCompletedを発行すると全てのストリームが購読終了してしまいましたが、Disposeを使えば一部の購読のみを終了させることができるようになります。

3.ストリームの寿命とSubscribeの終了タイミング

UniRxを利用する上で特に気をつけないといけものが、ストリームのライフサイクルです。
オブジェクトが頻繁に出現と削除を繰り返すUnityでは、特にここを意識しないとパフォーマンスの低下やエラーによる動作不具合を引き起こすことになってしまいます。

「ストリーム」の実体は誰が持っているのか?

ストリームの寿命管理を行う上で、「そのストリームは誰の持ち物なのか?」を意識する必要があります。
基本的に、ストリームの実体は「Subject」であり、Subjectが破棄されればストリームも破棄されることとなります。

6112cbd1-dede-e0ee-0521-9fbb71d5cffd.png
前回も解説しましたが、「Subscribe」とはSubjectに関数を登録する処理でした。つまり、ストリームの実体はSubjectが内部に保持する「呼び出す関数リスト(及びその関数に連なるメソッドチェーン)」でであり、Subjectがストリームを管理していることになります。

Subjectが破棄されればストリームも全て破棄される。逆に言えば、Subjectが残っている限りストリームは稼働し続けてしまうということになります。ストリームが参照しているオブジェクトをストリームより先に破棄して放置してしまうと、裏でストリームが動き続けたままのせいでパフォーマンスを低下させたり、メモリリークを引き起こしたり、NullReferenceExceptionを発生させてゲームを停止させてたりしてしまう可能性が出てきます。

ストリームの寿命管理は細心の注意を払い、使い終わったら必ずDisposeを呼ぶ、またはOnCompletedを発行する癖をつけましょう。

例9 プレイヤの座標をイベント通知で書き換える

概要

アクションゲームを想定してこんな例を考えてみましょう。

  • プレイヤは操作することができる
  • タイマで時間をカウントしている
  • タイマが0になったときにプレイヤを初期座標に戻す
  • プレイヤは画面外に出ると死亡(消滅)する

実装

タイマ側
using System.Collections;
using UniRx;
using UnityEngine;

/// <summary>
/// カウントダウンしてその時の値を通知する
/// 3,2,1,0,(OnCompleted) といったイベントが飛ぶ
/// </summary>
public class TimeCounter : MonoBehaviour
{
    [SerializeField] private int TimeLeft = 3;

    //タイマストリームの実体はこのSubject
    private Subject<int> timerSubject = new Subject<int>();

    public IObservable<int> OnTimeChanged
    {
        get { return timerSubject; }
    } 

    void Start()
    {
        StartCoroutine(TimerCoroutine());

        //現在のカウントを表示
        timerSubject.Subscribe(x => Debug.Log(x));
    }

    IEnumerator TimerCoroutine()
    {
        yield return null;

        var time = TimeLeft;
        while (time >= 0)
        {
            timerSubject.OnNext(time--);
            yield return new WaitForSeconds(1);     
        }
        timerSubject.OnCompleted();
    }
}
Player
using UniRx;
using UnityEngine;

//プレイヤの移動処理を行う
//タイマが0になったら初期座標に戻す
public class PlayerMover : MonoBehaviour
{
    [SerializeField]
    private TimeCounter _timeCounter;

    private float _moveSpeed = 10.0f;

    void Start()
    {
         //タイマを購読
        _timeCounter.OnTimeChanged
            .Where(x => x == 0) //タイマが0になった時のみ実行
            .Subscribe(_ =>
            {
                //タイマが0になったら初期座標に戻る
                transform.position = Vector3.zero;
            });
    }

    void Update()
    {
        //右矢印を押している間移動する
        if (Input.GetKey(KeyCode.RightArrow))
        {
            transform.position += new Vector3(1, 0, 0) * _moveSpeed * Time.deltaTime;
        }

        //画面外に出たら削除する
        if (transform.position.x > 10)
        {
            Debug.Log("画面画に出た");
            Destroy(gameObject);
        }
    }
}

実行結果(タイマ0でプレイヤが生存していた場合)

ok.gif

タイマ0になったときにプレイヤの座標が正しく初期位置に上書きされています。
この様に、プレイヤが生存していた場合はこのコードでも正しく実行されます。

実行結果(タイマ0でプレイヤが削除されていた場合)

ok2.gif

タイマ0の時点でプレイヤが削除されていた場合、このコードではMissingReferenceException例外が発生してしまいました。
つまり、このコードではタイマ0の時点でプレイヤが消滅していた場合はおかしくなるということがわかりました。

原因

原因はPlayerMoverのこの部分にあります。

PlayerMoverの一部抜粋
void Start()
{
    _timeCounter.OnTimeChanged
        .Where(x => x == 0) //タイマが0になった時のみ実行
        .Subscribe(_ =>
        {
            //タイマが0になったら初期座標に戻る
            transform.position = Vector3.zero;
        });
}

先程言ったとおり、ストリームの実体はSubjectが保持しています。
PlayerMoverが削除されたとしても、TimeCounter.timerSubjectがtransform.position = Vector3.zero;を呼び出してしまうということになります。
そしてPlayerのtransformにアクセスしようとし、transformの取得に失敗してMissingReferenceExceptionが発生してしまう、というのがこの例外がでる原因です。

名称未設定-1.png

この様にストリームの寿命とオブジェクトの寿命が一致していない場合、ストリームを正しく止めて上げないと動作不良を起こす原因となってしまいます。

対策

対策は単純に、 PlayerのGameObjectが破棄されたらストリームの購読を停止すればよいです。

実装方法はいくつかありますが、今回は一番簡単なAddToを使った例を紹介しておきます。

PlayeMoverの改善
void Start()
{
    _timeCounter.OnTimeChanged
        .Where(x => x == 0) //タイマが0になった時のみ実行
        .Subscribe(_ =>
        {
            //タイマが0になったら初期座標に戻る
            transform.position = Vector3.zero;
        }).AddTo(gameObject); //指定のgameObjectが破棄されたらDisposeする
}

UniRxにはAddToというメソッドが用意されており、Subscribeに続けてAddTo(gameObject)と書くことで、指定のGameObjectがDestroyされた自動的にDisposeを呼び出してくれるように設定することができます。

こうすることで、Playerが削除されたら同時にストリームの購読も中止されるため、先程のような例外は発生しなくなります。

5.gif
画面外に出てオブジェクトが削除されても例外が出なくなった

4.まとめ

メッセージの種類は3種類存在する

  • OnNext : 通常イベントが発行されたときに通知されるメッセージ
  • OnError : ストリームの処理中で例外が発生したことを通知する
  • OnCompleted: ストリームが終了したことを通知する

ストリームの購読を中断するパターンは3つある

  • SubscribeがOnCompletedを検知する
  • SubscribeがOnErrorを検知する
  • Subscribeが返すIDisposableのDispose()を呼び出す

ストリームの寿命とオブジェクトの寿命の関係は常に意識する必要がある

  • オブジェクトを消したけどストリームは生き残ったまま、という状態は絶対に避ける
  • ストリームを使い終わったらDisposeを呼ぶまたはOnCompletedを発行する癖をつける