背景
UniRxを使用しているときにSubscribe
の内部でSubscribe
を終わらせたくなることがたまにあります。
Observable.EveryUpdate()
.Subscribe(_ =>
{
if (/* 何かの条件 */)
{
// ここでSubscribeをやめたい
}
});
こういう場合、以下の二つの方法で解決することができます。
方法1 TakeUntilを使う
TakeUntil
を使う方法ではとある条件になった時に値を流すストリームを作成し,それを停止判定に使います。
var hoge = Observable.EveryUpdate()
.Where(_ => /* 何かの条件 */)
Observable.EveryUpdate()
.TakeUntil(hoge)
.Subscribe(_ =>
{
// 何かの処理
});
サンプル(クリックされた時に停止)
var click = Observable.EveryUpdate()
.Where(_ => Input.GetmouseButtonDown(0));
Observable.EveryUpdate()
.TakeUntil(click)
.Subscribe(l => Debug.Log(l));
方法2 IDisposableをキャプチャする
方法1は厳密にはSubscribe
の中から購読を停止しているわけではありません。
Subscribe
の中から購読を停止するためにはSubscribe
の戻り値をDispose
する必要があります。
そこでSubscribe
の戻り値を変数に格納し、その変数をラムダ式でキャプチャすれば購読を停止することができます。
// SingleAssignmentDisposableを使う方法
var disposable = new SingleAssignmentDisposable();
disposable.Disposable = Observable.EveryUpdate()
.Subscribe(_ =>
{
if (/* 何かの条件 */)
{
disposable.Dispose();
}
});
// CompositeDisposablを使う方法
// 複数のストリームを同時に止めることができる
var disposables = new CompositeDisposable();
Observable.EveryUpdate()
.Subscribe(_ =>
{
if (/* 何かの条件 */)
{
disposables.Dispose();
}
})
.AddTo(disposables);
Observable.EveryUpdate()
.Subscribe(_ => /* ... */)
.AddTo(disposables);
おまけ
First
を使用すると値が一度流れると購読を停止することができます。
しかしFirst
のソースを読んでもDispose
する処理はすぐには見つかりません。
// FirstCoreのソースを引用
static IObservable<T> FirstCore<T>(this IObservable<T> source, bool useDefault)
{
return Observable.Create<T>(observer =>
{
return source.Subscribe(x =>
{
observer.OnNext(x);
observer.OnCompleted();
}, observer.OnError,
() =>
{
if (useDefault)
{
observer.OnNext(default(T));
observer.OnCompleted();
}
else
{
observer.OnError(new InvalidOperationException("sequence is empty"));
}
});
});
}
しかし、First
を使用すると購読が停止されるためどこかでDispose
を呼んでいるはずです。
答えはObservable.Create
に渡す関数の第一引数であるobserver
にあります。
observer
はIObserver<T>
と定義されていますが、実際のクラスはAnonymouseObserver
なことが多いです。
そしてこのAnonymouseObserver
のOnCompleted
にDispose
処理があります。
// AnonymouseObserverのソースを引用
// disposableはObservable.Createに渡した関数の戻り値を格納したSingleAssignmentDisposable
public void OnCompleted()
{
if (Interlocked.Increment(ref isStopped) == 1)
{
try
{
onCompleted();
}
finally
{
disposable.Dispose();
}
}
}