Edited at

UniRx入門 その5 -コルーチンとの併用-

More than 1 year has passed since last update.

UniRx入門シリーズ 目次はこちら



0.前回のおさらい

前回はUpdate()をどのようにObservableに変換して利用するかの解説を行いました。

今回はさらに一歩進んで、コルーチンと組み合わせてUniRxを利用する方法を紹介したいと思います。


1.コルーチンとUniRx

Unityにはデフォルトで「コルーチン」と呼ばれる機能が用意されています。これは本来はイテレータ処理を実装する際に利用するIEnumeratoryieldキーワードを活用し、Unityのメインスレッド上で非同期処理っぽいことを実現するという機能です。

(たまに勘違いしている人がいますが、コルーチンに記述した処理はUnityのメインスレッド上で実行されます。Update()なんかと同じ扱いで、実行タイミングもだいたい同じです。 参考

そして今まで紹介してきたUniRxですが、このコルーチンと併用して利用することでさらに表現の幅を広げることができます。というのも、UniRx単体では宣言的に記述したストリームはif分岐ができなかったり、ストリームの結果を利用してそのまま手続き的な処理に繋げるといった記述が困難でした。ですが、コルーチンとUniRxを併用することでこれらの問題を解決することができるようになります。また、コルーチンの手続き的に処理を記述できるという性質を活かしつつ、UniRxの柔軟な例外処理を利用するといったことも可能になります。

UniRxとコルーチンの組み合わせは本当に便利なので、ぜひ使い方を覚えて活用してもらえばいいなと思います。

用語解説



  • 宣言的 : 副作用がない関数をメソッドチェーンでつなげ、一連の挙動を記述するやり方


    • メリット: 必要な処理を順番につなげて書くだけで実装できて可読性も高い

    • デメリット: 要求する処理が複雑すぎる場合、出来合いの関数だけでは実装できない場合がある




  • 手続き的 状態変数やforやif文を使って、挙動を1から全部記述するやり方


    • メリット: 自分で好きに書けるのでどんな処理も実装可能

    • デメリット: 煩雑な記述が増え可読性が低くなる




2. コルーチンからIObservableに変換する

ではまず1つ目の使い方として、コルーチンからIObservableに変換する方法を紹介します。

コルーチンをストリームに変換すれば、コルーチンの結果を使ってそのまま自然な流れでUniRxのオペレーターチェーンに接続してあげることが可能になります。

また、複雑な挙動をするストリームを生成する時はコルーチンで実装してストリームに変換するという手法を取ったほうが、UniRxのオペレーターチェーンのみで頑張ってストリームを構築するよりもシンプルにまとまる場合があります。


Ⅰ.コルーチンの終了タイミングをストリームとして待ち受ける

使うもの: Observable.FromCoroutine

返り値:IObservable<Unit>

第一引数:Func<IEnumerator> coroutine コルーチン本体

第二引数:bool publishEveryYield = false yieldしたタイミングでOnNextを発行するか? (falseでOnCompletedの直前に1度だけ発行,default = false

Observable.FromCoroutine を利用すると、コルーチンの終了タイミングをストリームで扱うことが可能になります。

コルーチンの終了タイミングの通知を必要とする時に利用することができます。


Observable.FromCoroutineの使用例

using System.Collections;

using UniRx;
using UnityEngine;

public class ConvertFromCoroutine : MonoBehaviour
{
void Start()
{
Observable.FromCoroutine(NantokaCoroutine, publishEveryYield: false)
.Subscribe(
_ => Debug.Log("OnNext"),
() => Debug.Log("OnCompleted")
).AddTo(gameObject);
}

IEnumerator NantokaCoroutine()
{
Debug.Log("Coroutine started.");

//なんか処理して待ち受ける的な
yield return new WaitForSeconds(3);

Debug.Log("Coroutine finished.");
}
}



実行結果

Coroutine started.

Coroutine finished.
OnNext
OnComplted

Observable.FromCoroutineSubscribeされるごとに新たにコルーチンを生成して起動してしまうという点に注意して下さい。コルーチンは1つだけ起動しストリームだけを共有して利用したいといった場合にはストリームのHot変換が必要になります。

なお、Observable.FromCoroutineで起動したコルーチンはSubscribeをDisposeすると自動的に停止します。

もしコルーチン上で自身のストリームがDisposeされたことを検知したい場合は、コルーチンの引数にCancellationTokenを渡すことでDisposeを検知することが可能になります。この時のCancellationTokenはObservable.FromCoroutineから取得することができます。


CancellationTokenを利用した呼び出し例

Observable.FromCoroutine(token => NantokaCoroutine(token)) //tokenがCancellationToken



Ⅱ.コルーチンのyield returnの結果を取り出す

使うもの: Observable.FromCoroutineValue<T>

返り値:IObservable<T>

第一引数:Func<IEnumerator> coroutine コルーチン本体

第二引数:bool nullAsNextUpdate = true nullの時にOnNextを発行しない,default = true

Observable.FromCoroutineValue<T>を利用するとコルーチンのyield returnで返された値を取り出してストリームとして利用することが可能になります。

yield return は呼び出されるたびに1フレーム停止するという性質があるため、これを利用して1フレームずつ値を発行することが可能になります。


FromCoroutineValueを利用してリストの中身を1フレームに1つ取り出す処理

using System.Collections;

using System.Collections.Generic;
using UniRx;
using UnityEngine;

public class ConvertFromCoroutine : MonoBehaviour
{
/// <summary>
/// 移動座標リスト
/// </summary>
[SerializeField]
private List<Vector2> moveList;

void Start()
{
//コルーチンから値を取り出す
Observable.FromCoroutineValue<Vector2>(MovePositionCoroutine)
.Subscribe(x => Debug.Log(x));
}

/// <summary>
/// リストから値を1フレームずつ取り出すコルーチン
/// </summary>
IEnumerator MovePositionCoroutine()
{
foreach (var v in moveList)
{
yield return v;
}

// ↑のforeach文はまるごと
// "return moveList.GetEnumerator();"
// と書き換えてもよい
}
}


実行結果

1.png


Ⅲ.コルーチン内部でOnNextを直接発行する

使うもの: Observable.FromCoroutine<T>

返り値:IObservable<T>

第一引数:Func<IObserver<T>, IEnumerator> coroutine IObserver<T>を引数に取るコルーチン

Observable.FromCoroutine<T>IObserver<T>を提供する実装も存在します。このIObserver<T>をコルーチンに渡すことで、コルーチン上の任意のタイミングでOnNextを発行することが可能になります。

この機能を利用することで、内部実装は手続き的な非同期処理で書きつつ、外部からストリームとして扱うといったコルーチンとUniRxの双方のいいとこ取りをすることができてしまいます。

非常に便利で汎用的な機能なのでぜひ覚えて下さい。

なお、OnCompletedは自動で発行されないので、コルーチン終了のタイミングで自分でOnCompltedを発行してあげる必要があります。


一時停止フラグがfalseの時のみ時間を測る

using System.Collections;

using UniRx;
using UnityEngine;

public class ConvertFromCoroutine : MonoBehaviour
{
/// <summary>
/// 一時停止フラグ, trueの時はタイマを止める
/// </summary>
public bool IsPaused;

void Start()
{
Observable.FromCoroutine<long>(observer => CountCoroutine(observer))
.Subscribe(x => Debug.Log(x))
.AddTo(gameObject);
}

/// <summary>
/// 一時停止フラグが経っていない状態の経過秒数をカウントして通知する
/// </summary>
/// <param name="observer">通知用IObserver</param>
/// <param name="cancellationToken">CancellationToken</param>
/// <returns></returns>
IEnumerator CountCoroutine(IObserver<long> observer)
{
long current = 0;
float deltaTime = 0;

// Disposeしたらコルーチンごと止まるから while(true) でも問題なく動く
// 気持ち悪いならCancellationTokenを受け取って利用すれば良い
while (true)
{
if (!IsPaused)
{
//一時停フラグが経っていない間は時間を計測する
deltaTime += Time.deltaTime;
if (deltaTime >= 1.0f)
{
//差分が1秒を超えたら整数部を取り出して集計して通知
var integerPart = (int)Mathf.Floor(deltaTime);
current += integerPart;
deltaTime -= integerPart;

//経過秒数通知
observer.OnNext(current);
}
}
yield return null;
}
}
}


実行結果

count.gif

(一時停止フラグがtrueの間はカウントを停止し、falseになると停止前のカウントから計測を再開する)

「状態に依存した処理」や「途中で処理が大きく分岐する処理」といったものはUniRxのオペレータチェーンのみで記述するのは難しく、場合によっては記述不可能な場合もあったりします。そういった場合はこのようにコルーチンで内部実装を行いストリームに変換してしまうという方法を取ることを推奨します。


Ⅳ.より低コストで軽量なコルーチンを実行する

使うもの: Observable.FromMicroCoroutine / Observable.FromMicroCoroutine<T>

返り値:IObservable<Unit> / IObservable<T>

第一引数:Func<IEnumerator> coroutine / Func<IObserver<T>, IEnumerator> coroutine

引数:FrameCountType frameCountType = FrameCountType.Update Update,FixedUpdate,EndOfFrameのどのタイミングを利用するか

Observable.FromMicroCoroutineおよびObservable.FromMicroCoroutine<T>はそれぞれ先程解説したObservable.FromCoroutine/Observable.FromCoroutine<T>とほぼ同じ挙動をします。

ただし、内部実装は大きく異なっており、コルーチン内で yield return null しか利用できないとう制約がある代わりに、Unity標準のコルーチンと比べ非常に高速に起動し動作する仕組みになっています。この仕組のコルーチンを「マイクロコルーチン」と呼び、UniRxの独自実装になっています。

yield return null;のみで済むコルーチンを作成して起動する場合は、Unity標準のStartCoroutineよりもこのObservable.FromMicroCoroutineを使って起動してあげればより低コストでコルーチンを利用できるようになります。


Observable.FromMicroCoroutineの利用例

void Start()

{
//先程の「一時停止フラグがfalseの時のみ時間を測る」の時のコルーチンを
//FromMicroCoroutineで起動した例
Observable.FromMicroCoroutine<long>(observer => CountCoroutine(observer))
.Subscribe(x => Debug.Log(x))
.AddTo(gameObject);
}


コルーチンからIObservableに変換するまとめ


  • コルーチンからIObservableに変換することができる


  • Observable.FromCoroutine等で実行したコルーチンはMainThreadDispatcherに管理が委譲されるため、寿命管理には注意する必要がある(AddToを忘れない)


  • Observable.FromCoroutine等はSubscribeされた時点で新しくコルーチンを生成して起動してしまうため、1個のコルーチンを共有して複数回Subscribeする際はHot変換が必要になる


3. IObservableからコルーチンに変換する

2つ目の使い方として、UniRxのストリームをコルーチンに変換する方法を紹介します。

このストリームをコルーチンに変換する技を利用することで、コルーチン上でストリームの実行結果を待ち受けて、そのまま処理を続けるといった記述方法が可能になります。

「C#のTaskとそのawaitに相当する」とざっくり思ってもらえればよいかと思います。


ストリームをコルーチンに変換する(Unity 5.3以降)

使うもの: ToYieldInstruction() (IObservable<T>に対する拡張メソッド)

返り値:ObservableYieldInstruction<T>

引数:CancellationToken cancel 処理を中断した場合は引数に渡す(省略可)

引数:bool throwOnError = false OnErrorが発生した時に例外内容をthrowするか?

ToYieldInstructionを利用することで、ストリームをコルーチンとして実行した上でストリームを待ち受けることが可能になります。


ToYieldInstructionの利用例

using System;

using System.Collections;
using UniRx;
using UniRx.Triggers;
using UnityEngine;

public class ConvertToCoroutine : MonoBehaviour
{
void Start()
{
StartCoroutine(WaitCoroutine());
}

IEnumerator WaitCoroutine()
{
// Subscribeの代わりにToYieldInstruction()を利用することで
// コルーチンとしてストリームを扱えるようになる

// 1秒待つ
Debug.Log("Wait for 1 second.");
yield return Observable.Timer(TimeSpan.FromSeconds(1)).ToYieldInstruction();

// ToYieldInstruction()はOnCompletedが発行されてコルーチンを終了する
// そのためOnCompletedが必ず発行されるストリームでのみ利用できる
// 無限に続くストリームの場合はFirstやFirstOrDefaultを使うとよいかも

Debug.Log("Press any key.");
// 何かキーが押されるまで待つ
yield return this.UpdateAsObservable()
.FirstOrDefault(_ => Input.anyKeyDown)
.ToYieldInstruction();
// FirstOrDefaultは条件を満たすとOnNextとOnCompletedを両方発行する

Debug.Log("Pressed.");
}
}


ToYieldInstructionOnCompletedメッセージを受けてyield returnを終了します。そのためOnCompletedを発行しない無限ストリームをToYieldInstructionしてしまうと永遠に終了しない状態になってしまうため注意が必要です。

また、ストリーム中で発行されたOnNextメッセージを利用する場合は、ToYieldInstructionが返すObservableYieldInstruction<T>を変数に保存しておくことで結果の取得が可能になります。


ストリームの結果を取得してコルーチン上で使う

using System;

using System.Collections;
using UniRx;
using UniRx.Triggers;
using UnityEngine;

public class ConvertToCoroutine : MonoBehaviour
{
void Start()
{
StartCoroutine(DetectCoroutine());
}

IEnumerator DetectCoroutine()
{
Debug.Log("Coroutine start!");

// コルーチンが開始されてから
// 3秒以内に最初に自分に触れたオブジェクトを取得する
var o = this.OnCollisionEnterAsObservable()
.FirstOrDefault()
.Select(x => x.gameObject)
.Timeout(TimeSpan.FromSeconds(3))
.ToYieldInstruction(throwOnError: false);

//Timeoutは指定時間以内にストリームが終了しなかった場合に
//OnErrorを発行するオペレータ

// 結果を待ち受ける
yield return o;

if (o.HasError || !o.HasResult)
{
//1秒以内に何もヒットしなかった
Debug.Log("hit object is nothing.");
}
else
{
//何かヒットした
var hitObject = o.Result;
Debug.Log(hitObject.name);
}
}
}


hit.gif

nohit.gif


ストリームをコルーチンに変換する(Unity 5.2以前)

Unity5.2以前ではToYieldInstructionを利用することはできません。代わりにStartAsCoroutineを使うことで同等の処理を行うことができます。


StartAsCoroutine版

IEnumerator DetectCoroutine()

{
GameObject result = null;
bool isTimeout = false;

// コルーチンが開始されてから
// 3秒以内に、最初に自分に触れたオブジェクトを取得する
yield return this.OnCollisionEnterAsObservable()
.FirstOrDefault()
.Select(x => x.gameObject)
.Timeout(TimeSpan.FromSeconds(3))
.StartAsCoroutine(x => result = x, error => isTimeout = true);

// StartAsCoroutineは第一引数の関数に結果が渡されるため、
// そこで事前に定義した変数に結果を代入することで結果を取得できる
// 第二引数はOnError

if (isTimeout || result == null)
{
Debug.Log("hit object is nothing.");
}
else
{
var hitObject = result;
Debug.Log(hitObject.name);
}
}



IObservableからコルーチンに変換するまとめ



  • ToYieldInstructionまたはStartAsCoroutineを利用することでストリームをコルーチンに変換することができる

  • 応用すれば「コルーチンの途中で特定のイベントの発行を待ち受ける」といったことが可能になる


応用例


コルーチンを直列に実行して待ち受ける

CoroutineAを実行 → CoroutineAの終了を受けてCoroutineBを起動する


コルーチンを直列に実行して待ち受ける

void Start()

{
Observable.FromCoroutine(CoroutineA)
.SelectMany(CoroutineB) //SelectManyで合成可能
.Subscribe(_ => Debug.Log("All coroutine finished"));
}

IEnumerator CoroutineA()
{
Debug.Log("CoroutineA start");
yield return new WaitForSeconds(3);
Debug.Log("CoroutineA finished");
}

IEnumerator CoroutineB()
{
Debug.Log("CoroutineB start");
yield return new WaitForSeconds(1);
Debug.Log("CoroutineB finished");
}



実行結果

CoroutineA start

CoroutineA finished
CoroutineB start
CoroutineB finished
All coroutine finished



複数コルーチンを同時に起動して結果を待ち受ける

CoroutineAとCoroutineBを同時に起動し、全て終了してからまとめて処理する


複数コルーチンを同時に起動して順次結果を待ち受ける

void Start()

{
//コルーチンAとコルーチンBを同時に起動し、全部終わるまで待ってから処理する
Observable.WhenAll(
Observable.FromCoroutine<string>(o => CoroutineA(o)),
Observable.FromCoroutine<string>(o => CoroutineB(o))
).Subscribe(xs =>
{
foreach (var x in xs)
{
Debug.Log("result:" + x);
}
});
}

IEnumerator CoroutineA(IObserver<string> observer)
{
Debug.Log("CoroutineA start");
yield return new WaitForSeconds(3);
observer.OnNext("CoroutineA done!");
observer.OnCompleted();
}

IEnumerator CoroutineB(IObserver<string> observer)
{
Debug.Log("CoroutineB start");
yield return new WaitForSeconds(1);
observer.OnNext("CoroutineB done!");
observer.OnCompleted();
}



実行結果

CoroutineB start

CoroutineA start
result:CoroutineA done!
result:CoroutineB done!



重い処理を別スレッドに逃がしつつ、結果をコルーチン上で扱う

コルーチン上で一部処理を別スレッドで実行し、結果が返ってきたら処理をコルーチン上で再開するような実装。Observable.Start()を利用する。

void Start()

{
StartCoroutine(GetEnemyDataFromServerCoroutine());
}

/// <summary>
/// サーバから敵の情報を引いて来るコルーチン
/// </summary>
private IEnumerator GetEnemyDataFromServerCoroutine()
{
//サーバからxmlダウンロード
var www = new WWW("http://api.hogehoge.com/resouces/enemey.xml");

yield return www;

if (!string.IsNullOrEmpty(www.error))
{
Debug.Log(www.error);
}

var xmlText = www.text;

// ParseXml関数を別スレッドで実行
// Observable.Start は引数内の関数をThreadPool上で実行する機能
var o = Observable.Start(() => ParseXml(xmlText)).ToYieldInstruction();

//パース終了待機
yield return o;

if (o.HasError)
{
//パースに失敗
Debug.LogError(o.Error);
yield break;
}

//パースした結果
var result = o.Result;
Debug.Log(result);

/*
この後にまた処理が続く
*/

}

Dictionary<string, EnemyParameter> ParseXml(string xml)
{
//ここでxmlのパースをしてDictionaryに変換して返す的なやつがある想定
return new Dictionary<string, EnemyParameter>();
}

/// <summary>
/// 敵のパラメータ的なやつ
/// </summary>
struct EnemyParameter
{
public string Name { get; set; }
public string Helth { get; set; }
public string Power { get; set; }
}

(↑の実装は↓の書き方の方がシンプルにまとまるけど、あくまでコルーチンを使うとどうなるかの説明なのでゆるして)


UniRxだけでの実装

ObservableWWW.Get("http://api.hogehoge.com/resouces/enemey.xml")

.SelectMany(x => Observable.Start(() => ParseXml(x)))
.ObserveOnMainThread() //処理をメインスレッドに戻す
.Subscribe(result =>
{
//ここにパース結果を使った処理
},
ex => Debug.LogError(ex)
);


まとめ


  • ストリームとコルーチンは相互に変換することができる

  • コルーチンを利用することで、オペレータチェーンだけでは作成不可能なストリームを構築することが可能になる

  • UniRxのコルーチン機構に乗せることで、Unity標準のコルーチンよりも使い勝手やパフォーマンスが向上する場合がある

  • ストリームをコルーチンに変換することで、async/awaitっぽい記述が可能になる(あくまでっぽいだけ)