AsyncSubjectとは
AsyncSubject
とはSubjectシリーズの1つであり、非同期処理を扱うのに特化したSubject
という位置づけです。
ふるまいはJavaScriptのPromise
やScalaのFuture
とほぼ同じです。将来、結果が1つだけ確定する場合において用いることができる非同期処理向けのSubject
です。
UniRxにおけるObservable
はストリームの長さが 1~無限 の長さを扱うことができる仕組みになっています。
このとき、ストリームの長さをあえて1に固定してしまうことで、Future/Promise
と全く同じことができるようになるという発想のもと作られたのがこのAsyncSubjectです。
(Observable
の方がFuture
/Promise
よりも上位概念であるとも言えます)
AsyncSubjectのふるまい
(画像引用: ReactiveX)
AsyncSubject
のふるまいは次の通りです。
- OnNextしても値をすぐに発行しない
- OnCompletedが実行されたときにはじめてOnNextを1つだけ発行する
- 発行するOnNextは一番最後の値となる
- Complete済みのAsyncSubjectをSubscribeした場合、結果のOnNextとOnCompletedを直ちに発行する
Promise
のresolve(value)
がOnNext(value) + OnCompleted
に分解されたと、と考えてもらうとわかりやすいかもしれません。
つかいみち
ふつうのSubject
の場合はSubscribeするより以前に発行されたOnNextは購読できないという性質でした。
ですがこのAsyncSubject
の場合はSubscribeのタイミングによらずにAsyncSubjectが完了したら必ずOnNextとOnCompletedを受け取ることができるようになっています。
この性質から、非同期処理のコールバックの実装に非常に向いています(というかもともとそのための用途のSubjectですし)。
例:オブジェクトの初期化順序を制御する
いきなり変則的な例ですが、便利なので紹介します。
オブジェクトAとオブジェクトBの2つがあり、必ず「A → B」の順番で初期化して欲しいという例を考えてみます。
Unityの場合ですとScript Execution Order
を使いスクリプトの実行順序そのものを調整することで実現することができますが、「実行順序に依存する」という情報がソースコードからUnity側に漏れてしまうため非常に気持ち悪くできれば使いたくない機能です。
そこでAsyncSubject
を使い、初期化順序をうまく実装してみましょう。
実装例
using UniRx;
using UnityEngine;
namespace AsyncSubjectSample
{
/// <summary>
/// 先に初期化されて欲しい方
/// </summary>
public class ObjectA : MonoBehaviour
{
private AsyncSubject<Unit> _initializedAsyncSubject = new AsyncSubject<Unit>();
public IObservable<Unit> OnInitializedAsync
{
get { return _initializedAsyncSubject; }
}
void Start()
{
Debug.Log("ObjectAのStartが実行されました");
//ここでAの初期化処理
Debug.Log("ObjectAの初期化が終わりました");
//初期化完了通知
_initializedAsyncSubject.OnNext(Unit.Default);
_initializedAsyncSubject.OnCompleted();
}
}
}
using UniRx;
using UnityEngine;
namespace AsyncSubjectSample
{
/// <summary>
/// Aの後に初期化されて欲しい方
/// </summary>
public class ObjectB : MonoBehaviour
{
[SerializeField]
private ObjectA objectA;
void Start()
{
Debug.Log("ObjectBのStartが実行されました");
// Aの初期化完了通知が来たらBを初期化する
// OnInitializedAsyncが既にCompleted済みなら直ちに実行される
objectA.OnInitializedAsync.Subscribe(_ => Initialize());
}
void Initialize()
{
//ここでBの初期化処理
Debug.Log("ObjectBの初期化が終わりました");
}
}
}
結果
- BのStart()の方が先に実行された場合
ObjectBのStartが実行されました
ObjectAのStartが実行されました
ObjectAの初期化が終わりました
ObjectBの初期化が終わりました
- AのStart()の方が先に実行された場合
ObjectAのStartが実行されました
ObjectAの初期化が終わりました
ObjectBのStartが実行されました
ObjectBの初期化が終わりました
このように、Start()
の実行順序とは関係なく必ずA → B
の順序で初期化が実行されるようになりました。
まとめ
AsyncSubject
は他のSubject
と比較して挙動が違うので注意が必要です。
とくに、RxはObservableのストリーム長を1に固定することで、Promise/Futureと全く同じ非同期処理の考え方に載せることができると言うことができ、その実装の1つがAsyncSubjectというわけです。
おまけ
AsyncSubject<Unit>
をラップするオブジェクトを作るとちょっと便利だったので紹介します。
using System;
using UniRx;
namespace UniRxExt
{
/// <summary>
/// 名前がRxJavaのSingleとかぶってるけど動作は違います
/// </summary>
public class Single : IObservable<Unit>, IDisposable
{
private readonly AsyncSubject<Unit> _asyncSubject = new AsyncSubject<Unit>();
private readonly object lockObject = new object();
public void Done()
{
lock (lockObject)
{
if (_asyncSubject.IsCompleted) return;
_asyncSubject.OnNext(Unit.Default);
_asyncSubject.OnCompleted();
}
}
public IDisposable Subscribe(IObserver<Unit> observer)
{
lock (lockObject)
{
return _asyncSubject.Subscribe(observer);
}
}
public void Dispose()
{
lock (lockObject)
{
_asyncSubject.Dispose();
}
}
}
}
使い方
using UniRx;
using UniRxExt;
using UnityEngine;
namespace AsyncSubjectSample
{
/// <summary>
/// 先に初期化されて欲しい方
/// </summary>
public class ObjectA : MonoBehaviour
{
private Single _single = new Single();
public IObservable<Unit> OnInitializedAsync
{
get { return _single; }
}
void Start()
{
Debug.Log("ObjectAのStartが実行されました");
//ここでAの初期化処理
Debug.Log("ObjectAの初期化が終わりました");
//初期化完了通知
// OnNext + OnCompletedをくっつけて呼べるだけ
_single.Done();
}
}
}