Edited at

[UniRx]購読を停止する

More than 3 years have passed since last update.


背景

UniRxを使用しているときにSubscribeの内部でSubscribeを終わらせたくなることがたまにあります。

Observable.EveryUpdate()

.Subscribe(_ =>
{
if (/* 何かの条件 */)
{
// ここでSubscribeをやめたい
}
});

こういう場合、以下の二つの方法で解決することができます。


方法1 TakeUntilを使う

TakeUntilを使う方法ではとある条件になった時に値を流すストリームを作成し,それを停止判定に使います。

var hoge = Observable.EveryUpdate()

.Where(_ => /* 何かの条件 */)
Observable.EveryUpdate()
.TakeUntil(hoge)
.Subscribe(_ =>
{
// 何かの処理
});


サンプル(クリックされた時に停止)

var click = Observable.EveryUpdate()

.Where(_ => Input.GetmouseButtonDown(0));
Observable.EveryUpdate()
.TakeUntil(click)
.Subscribe(l => Debug.Log(l));


方法2 IDisposableをキャプチャする

方法1は厳密にはSubscribeの中から購読を停止しているわけではありません。

Subscribeの中から購読を停止するためにはSubscribeの戻り値をDisposeする必要があります。

そこでSubscribeの戻り値を変数に格納し、その変数をラムダ式でキャプチャすれば購読を停止することができます。


// SingleAssignmentDisposableを使う方法
var disposable = new SingleAssignmentDisposable();
disposable.Disposable = Observable.EveryUpdate()
.Subscribe(_ =>
{
if (/* 何かの条件 */)
{
disposable.Dispose();
}
});

// CompositeDisposablを使う方法
// 複数のストリームを同時に止めることができる
var disposables = new CompositeDisposable();
Observable.EveryUpdate()
.Subscribe(_ =>
{
if (/* 何かの条件 */)
{
disposables.Dispose();
}
})
.AddTo(disposables);
Observable.EveryUpdate()
.Subscribe(_ => /* ... */)
.AddTo(disposables);


おまけ

Firstを使用すると値が一度流れると購読を停止することができます。

しかしFirstのソースを読んでもDisposeする処理はすぐには見つかりません。

// FirstCoreのソースを引用

static IObservable<T> FirstCore<T>(this IObservable<T> source, bool useDefault)
{
return Observable.Create<T>(observer =>
{
return source.Subscribe(x =>
{
observer.OnNext(x);
observer.OnCompleted();
}, observer.OnError,
() =>
{
if (useDefault)
{
observer.OnNext(default(T));
observer.OnCompleted();
}
else
{
observer.OnError(new InvalidOperationException("sequence is empty"));
}
});
});
}

しかし、Firstを使用すると購読が停止されるためどこかでDisposeを呼んでいるはずです。

答えはObservable.Createに渡す関数の第一引数であるobserverにあります。

observerIObserver<T>と定義されていますが、実際のクラスはAnonymouseObserverなことが多いです。

そしてこのAnonymouseObserverOnCompletedDispose処理があります。

// AnonymouseObserverのソースを引用

// disposableはObservable.Createに渡した関数の戻り値を格納したSingleAssignmentDisposable

public void OnCompleted()
{
if (Interlocked.Increment(ref isStopped) == 1)
{
try
{
onCompleted();
}
finally
{
disposable.Dispose();
}
}
}