はじめに
Rxを学んでいざ自分でストリームの定義をし始めると、Coldなストリームの「Subscribe()
しないと起動しない」という挙動が使いづらいと感じました。
どうにかならないかといろいろ調べていたらCold→Hot変換用のメソッドとSubject<T>
の関連クラスにその答えが見つかったので、そのメモです。
「自分でストリーム作ったら公開するときにHot変換するといい」的なことは見たことありましたがあまりピンと来てなかったので、Rx勉強中の誰かの参考になれば幸いです。
具体的にどういうことか
例えばアニメーションで何かを表示するShow()
というメソッドを作ろうとしたときに、
-
Show()
メソッドを呼び出すが完了通知自体は必要ない、つまり呼び出すだけにしたい -
Show()
メソッドを呼び出して完了通知を受け取りたい、つまりSubscribe()
するもしくはWhenAll()
やZip()
に渡して待ち合わせしたい
という2つの使い方を考えます。
using System.Collections;
using UnityEngine;
using UniRx;
public class Window : MonoBehaviour
{
private RectTransform _rectTransform;
private RectTransform rectTransform {
get { return _rectTransform ?? (_rectTransform = this.GetComponent<RectTransform>()); }
}
// 呼び出すだけで起動したい、かつSubscribeしたら結果を受け取れるようにしたい
public IObservable<Unit> Show()
{
// NOTE: Coldなストリームを返す、つまり起動にはSubscribeが必要
return Observable.FromCoroutine(ShowCoroutine);
}
private IEnumerator ShowCoroutine()
{
this.rectTransform.localScale = new Vector3(0, 1, 1);
yield return null;
while (this.rectTransform.localScale.x < 1) {
this.rectTransform.localScale += new Vector3(0.05f, 0, 0);
yield return null;
}
}
}
両方の用途で使えるようにするには戻り値としてIObservable<T>
を返せばいいのですが、普通に書くと前者はストリームの起動のためだけにSubscribe()
を呼び出す必要があって冗長になってしまいます。
Observable.FromCoroutine
はColdなストリームを返すからです。
ダメなやり方
内部でSubscribe()
を呼び出して起動しておく
単純に「内部でSubscribe()
しちゃえば外側でSubscribe()
しなくても起動されるじゃん」と思ったのでやってみました。
public IObservable<Unit> Show()
{
var stream = Observable.FromCoroutine(ShowCoroutine);
// Subscribeを呼び出して公開前に起動
stream.Subscribe();
return stream;
}
これだとSubscribe()
しない場合の動作はOKですが、Subscribe()
すると2回起動してしまいます(Coldなストリームの特性)。
ちゃんとHot変換するにはMulticast()
メソッドを使用します。
Multicast()
でHot変換後、Connect()
で起動しておく
public IObservable<Unit> Show()
{
var stream = Observable.FromCoroutine(ShowCoroutine).Multicast(new Subject<Unit>());
// NOTE: Publishを使うと短く書ける
// var stream = Observable.FromCoroutine(ShowCoroutine).Publish();
stream.Connect();
return stream;
}
Multicast(ISubject<T> subject)
は内部で一人代表してSubscribe(subject)
し、OnNext(T)
の値を分配することでColdなストリームをHot変換してくれます。戻り値はIConnectableObservable<T>
で、Connect()
を呼び出すと内部的なSubscribe()
を開始、つまりストリームが起動します。
これでSubscribe()
を呼び出しても2回起動しなくなりましたが、今度はSubscribe()
される前にストリーム内の処理が終わったときにOnNext()
が受け取れません。
using System;
using UnityEngine;
using UniRx;
public class WaitForObservable : MonoBehaviour
{
[SerializeField]
private Window _window;
void Start()
{
var showStream = _window.Show();
// WARNING: Subscribe前にShowStreamが完了しているのでDebug.Logは呼ばれない!
Observable.Timer(TimeSpan.FromSeconds(1.5f)).Subscribe(_ => {
showStream.Subscribe(__ => Debug.Log("OnNext()"));
});
}
}
上記の例だと別にOnNext()
でなくてもOnComplete()
が受け取れるので大丈夫といえば大丈夫なのですが、どうしても取りこぼしたOnNext()
の値がほしい場合は困ります。
どうすればいいか
Subject<T>
の亜種で、OnNext(T)
で発行した値をキャッシュしてくれるReplaySubject<T>
を使えばOK。ReplaySubject<T>
はSubscribe()
時にこれまで発行したすべてのOnNext(T)
の値を発行します。
public IObservable<Unit> Show()
{
var stream = Observable.FromCoroutine(ShowCoroutine).Multicast(new ReplaySubject<Unit>());
// NOTE: Replayを使うと短く書ける
// var stream = Observable.FromCoroutine(ShowCoroutine).Replay();
stream.Connect();
return stream;
}
これでストリームがSubscribe()
前に終了していてもSubscribe()
時にOnNext(T)
が発行されます。
Subject<T>
の亜種は他にもありますが、詳細は参考資料のリンク先を参照してください。
上記の例だとFromCoroutine
がOnComplete()
前に一度だけOnNext(T)
を発行するので、AsyncSubject<T>
でもいいですね。
注意!
今回は「Subscribe()
しなくても起動したい」ということでDispose()
しなくても問題ないストリームを前提としましたが、基本的にはメソッド呼び出し側が責任を持ってストリームの寿命を管理できるようにすべきです。
例えばFromCoroutine
に渡すコルーチンが内部的にwhile(true){}
でずっとループし続ける場合などは止める手段がないとメモリリークの原因になります。
using System.Collections;
using UnityEngine;
using UniRx;
public class Window : MonoBehaviour
{
public IObservable<Unit> Show()
{
var stream = Observable.FromCoroutine(ShowCoroutine).Replay();
// WARNING: disposable.Dispose()を呼ばないとコルーチンが停止しない
var disposable = stream.Connect();
return stream;
}
private IEnumerator ShowCoroutine()
{
while (true) {
yield return null;
}
}
}
より安全にHot変換を行うにはRefCount()
をセットで使いましょう。
RefCount()
を使うとSubscribe()
が呼ばれたときにストリームが起動し、Subscribe()
したObserverが全員Dispose()
した時点でストリームがキャンセルされます。
Observable.FromCoroutine
の場合はObserverが全員Dispose()
されたらコルーチンが停止します。
public IObservable<Unit> Show()
{
return Observable.FromCoroutine(ShowCoroutine).Replay().RefCount();
}
ただ、上記の場合はストリームを起動するのに少なくとも1回はSubscribe()
が呼ばれる必要があり、この記事の冒頭に戻ってしまいますね。
結局は「Subscribe()
を必ず呼ばせて寿命管理を呼び出し側の責任とする」のが正しいのかもしれません。
2017/7/6 追記
「Subscribe()
しなくても実行されるストリーム」というのはほかのオペレータと組み合わせたときに待って実行することができないのでそもそも期待される実装とは異なることがほとんどです。この記事の内容は参考程度でお考えください。
参考資料
感想
「Cold→Hot変換よくわからんなぁ」「Subjectの亜種も用途がよくわからんなぁ」と思って放置してたがやりたいことを調べていく上で理解できたので良かった。
次はRxの非同期処理周りを理解したい。