はじめに
HotなObservableとは何なのかソースコードを読んで理解してみる
書いてる間にソースコード読んでいたら割とColdの挙動もちゃんと把握していなかったことに気がついたのでこれについての読解記事です。
UniRxのほうが本家より理解しやすいのでこっちを読みながら解説していきます。
2種類のColdなIObservable
ColdなIObservableは今回次の二種類に分けて紐解いていきたいと思います。
- 自分自身が値を作成していくジェネレータタイプ
- ReturnやRange,それからCreateにTimer等メソッドチェインの先頭に来るIObservable
- 流れてきたデータを加工して下流に流していくオペレータタイプ
- Select,Whereなどのオペレータや演算子と呼ばれるIObservable
今回この二種類について解説を書いていきます。
自分自身が値を作成していくジェネレータタイプ
まず、とても理解しやすいのでこちらを解説します。
一番簡単なReturnメソッドを読んでいきます。
このコードってどういう処理がされるの?という話です。
Observable.Return(1).Subscribe( Foo );
以下Returnメソッドの抜粋です
public static IObservable<T> Return<T>(T value)
{
return Return<T>(value, Scheduler.DefaultSchedulers.ConstantTimeOperations);
}
public static IObservable<T> Return<T>(T value, IScheduler scheduler)
{
if (scheduler == Scheduler.Immediate)
{
return new ImmediateReturnObservable<T>(value);
}
else
{
return new ReturnObservable<T>(value, scheduler);
}
}
Scheduler.DefaultSchedulers.ConstantTimeOperationsとScheduler.Immediateは基本的に同じものを指しているので、また今回Schedulerの説明はしないのでここでは以下のImmediateReturnObservableが返されるとして話をすすめます。
return new ImmediateReturnObservable<T>(value);
このクラスの抜粋が以下になります。
internal class ImmediateReturnObservable<T> : IObservable<T>, IOptimizedObservable<T>
{
readonly T value;
public ImmediateReturnObservable(T value)
{
this.value = value;
}
/* 今回の解説で使用しない1メソッドなので略 */
public IDisposable Subscribe(IObserver<T> observer)
{
observer.OnNext(value);
observer.OnCompleted();
return Disposable.Empty;
}
}
コンストラクタはデータを保持しているわけですね。
SubscribeメソッドをみてみるとobserverにOnNextで値を入れてOnCompletedで完了通知して、キャンセルできないのでDisposable.Emptyという無意味なDisposableを入れます。
このメソッドが呼ばれるたびに引数に毎回同じようにOnNextなどの操作をするわけですね。
さて、よく見ると引数がFunc<T>とかではなくIObserver<T>になっています。
実はここは拡張メソッドで定義されていてIObservable<T>にはSubscribe(IObserver<T> observer)しか定義されていません。
というわけで、その拡張メソッドを見に行きます。
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
{
return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, Stubs.Nop));
}
internal static IObserver<T> CreateSubscribeObserver<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
/* わかりやすくするためにコードを省略しました */
return new Subscribe<T>(onNext, onError, onCompleted);
}
この宣言を觀てもらえば分かる通りSubscribe(this IObservable source, Action onNext)はSubscribeというObserverを渡された引数のFunc<T>を使って作成してそれを返しています。
じゃあ、こいつなんなの?と言うと以下です。
class Subscribe<T> : IObserver<T>
{
readonly Action<T> onNext;
readonly Action<Exception> onError;
readonly Action onCompleted;
int isStopped = 0;
public Subscribe(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}
public void OnNext(T value)
{
if (isStopped == 0)
{
onNext(value);
}
}
public void OnError(Exception error)
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onError(error);
}
}
public void OnCompleted()
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onCompleted();
}
}
}
Interlocked.Incrementはスレッドセーフなインクリメントなので、要するにOnNext,OnError,OnCompletedが呼ばれたら対応するメソッド呼んでるだけです。
結局Observable.Return(1).Subscribe(Foo)は
- Fooが登録されたSubscribe<T>が作られる
- そのSubscribe<T>がObservable.Return(1)のSubscribeメソッドに渡される
- このSubscribeはSubscribe<T>.OnNext(1)を呼び出す。
- OnNextは登録されたonNextであるFoo(1)を呼び出す。
というながれになります。
これでReturnの流れがわかりました。
基本的な流れがつかめたところでもう一つTimerを読んでみます。
public static IObservable<long> Timer(TimeSpan dueTime)
{
return new TimerObservable(dueTime, null, Scheduler.DefaultSchedulers.TimeBasedOperations);
}
internal class TimerObservable : OperatorObservableBase<long>
{
readonly DateTimeOffset? dueTimeA;
readonly TimeSpan? dueTimeB;
readonly TimeSpan? period;
readonly IScheduler scheduler;
/* DateTimeOffsetは今回使用しないので中略 */
public TimerObservable(TimeSpan dueTime, TimeSpan? period, IScheduler scheduler)
: base(scheduler == Scheduler.CurrentThread)
{
this.dueTimeB = dueTime;
this.period = period;
this.scheduler = scheduler;
}
protected override IDisposable SubscribeCore(IObserver<long> observer, IDisposable cancel)
{
var timerObserver = new Timer(observer, cancel);
var dueTime = (dueTimeA != null)
? dueTimeA.Value - scheduler.Now
: dueTimeB.Value;
// one-shot
if (period == null)
{
return scheduler.Schedule(Scheduler.Normalize(dueTime), () =>
{
timerObserver.OnNext();
timerObserver.OnCompleted();
});
}
/* 以下Intervalメソッドなどでの利用なので略 */
}
SubscribeCoreはOperatorObservableBaseのSubscribeメソッドが呼び出されたときのメインメソッドです。
基本的にSubscribeメソッドだと思ってください。( cancelは今回無視でいいです)
これをみてみましょう。
TimerクラスはOnNextされるたびに引数のobserverにobserver.OnNext(onNextした回数);をするObserverです。難しくないので気になるなら読みに行ってください。
次の部分はdueTime後にtimerObserver.OnNext();とtimerObserver.OnCompleted();を実行するよという感じです。戻り値はおなじみのDisposeされたらキャンセルするやつです。
return scheduler.Schedule(Scheduler.Normalize(dueTime), () =>
{
timerObserver.OnNext();
timerObserver.OnCompleted();
});
まとめるとTimerメソッドのSubscribeは
- Timerというobserverを作る
- 時間になったらTimerのOnNextとOnCompleteを呼ぶ
- 結局引数で渡されたobserverのOnNextが呼ばれる
という感じです。
ここまで見るとわかるのですが、このクラスはSubscribeメソッドで渡されたobserverを記憶しません。
戻り値のIDisposableもtimerObserverも記憶しません。
つまり二回以上呼ばれたら何度も作り直すことになりますね。副作用はありません。
一回目のobserverでの戻り値と二回目のobserver'での戻り値に一切関係がありません。無関係のオブジェクトになります。
またColdの性質であるのSubscribeされるまで確かに何もしていないことがわかります。
ふむなるほどこれがColdか
というわけで次はオペレータタイプをみていきます。
流れてきたデータを加工して下流に流していくオペレータタイプ
今回は簡単なWhereに登場してもらいます。
こんな感じになってます
public static IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> predicate)
{
/* 最適化のための部分なので中略 */
return new WhereObservable<T>(source, predicate);
}
internal class WhereObservable<T> : OperatorObservableBase<T>
{
readonly IObservable<T> source;
readonly Func<T, bool> predicate;
readonly Func<T, int, bool> predicateWithIndex;
public WhereObservable(IObservable<T> source, Func<T, bool> predicate)
: base(source.IsRequiredSubscribeOnCurrentThread())
{
this.source = source;
this.predicate = predicate;
}
/* 最適化部分なので中略 */
protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
{
/* インデックス付き述語のためのifがあるので削除 */
return source.Subscribe(new Where(this, observer, cancel));
}
}
class Where : OperatorObserverBase<T, T>
{
readonly WhereObservable<T> parent;
public Where(WhereObservable<T> parent, IObserver<T> observer, IDisposable cancel)
: base(observer, cancel)
{
this.parent = parent;
}
public override void OnNext(T value)
{
var isPassed = false;
try
{
isPassed = parent.predicate(value);
}
catch (Exception ex)
{
try { observer.OnError(ex); } finally { Dispose(); }
return;
}
if (isPassed)
{
observer.OnNext(value);
}
}
public override void OnError(Exception error)
{
try { observer.OnError(error); } finally { Dispose(); }
}
public override void OnCompleted()
{
try { observer.OnCompleted(); } finally { Dispose(); }
}
}
こちらのタイプとジェネレータ型とで唯一の違いはIObservable sourceをコンストラクタが要求しているかどうかです。
こちらは入力を前のデータに依存しています。
それ以外はほとんど同じことがわかると思います。
SubscribeCoreはsourceにWhereというObserverを渡しています。
WhereはOnNextされるとフィルタ関数であるpredicateの結果をみてobserverのOnNextするか捨てるかを決めています。
これだけ!
結局やっぱりSubscribeするまではsourceに何か値が入ってきても何もしてませんね。sourceがHotな・・・例えばReactivePropertyであってもたれながしになります。
また、これもSubscribeしたときの情報を保持しません。
ここまでみていくと
var observable = Observable.Return(1).Where()のobservableにはWhereクラス(source=Return)が入っていることになります。
ここでobservable.Subscribe(observer)とすると、WhereのSubscribeが呼ばれる->sourceはReturnだからReturnのSubscribeが呼ばれる->ReturnはWhereのOnNextを呼ぶ->WhereはobserverのOnNextを呼ぶという流れになることがわかりますね。
これなんかいい例えないかなーと思ったんですけど
Subscribeするたびに専用のデータの通り道が一本一本できる感じかなぁと思います。
あと一本うどん想像しました。
これで基本的なRxの流れはすべて追うことができたと思います。
結論
ColdなIObservableというのは接続されたタイミングでは何もしない。
上流のIObservableや必要な値を保持するだけ。
Subscribeされたときに引数のobserverに値を流したり流す予約をしたり上流とSubscribeでつなげたりする。
このときデータを保持せずSubscribeが呼ばれるたびに新しくつなぎ直して使い回しはしない。