どーいうことか?
ReactiveExtensionsにおいて、Iobservable.SubscribeはIDisposableを戻り値となるから、こいつを使っていにしえから有るEventよっか相当ましなリソース制御が出来るようにゃなった。
けど、状況によっちゃDispose出来るタイミングが無かったりすることがあるので、Subscribeの結果が強参照を持たない様に何とかしてみたので、おつきあい頂ければ幸い。
この問題のゴール
今回、以下のような目標が有ると言うことで一つ。
- Subscribeの結果、IObservableにIObserverの強参照を持たせない。
- IObserverが開放されたら自動的にDisposeを実行して、サブスクライブの自動開放を行う。
- ↑のタイミングはOnNext,OnCompleted,OnErrorの呼ばれたタイミングを使う。
で何をしたのか
何をしたかってーと、WeakSubscribeメソッドと言う拡張メソッドを作成した。まぁ、その中で一服盛ってる。
内部に、WeakObserverを抱えており、こいつが結構キモとなる。以下実装
public static class WeakSubscriber
{
private class WeakObserver<T> : IObserver<T>
{
private readonly WeakReference<IObserver<T>> _observer;
private IDisposable _disposable;
public WeakObserver(IObserver<T> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
_observer = new WeakReference<IObserver<T>>(observer);
}
//This method is not protected from multi thread.
public void SetDisposable(IDisposable disposable)
{
if (disposable == null) throw new ArgumentNullException(nameof(disposable));
if(_disposable!=null) throw new InvalidOperationException();
_disposable = disposable;
}
public void OnNext(T value)
{
IObserver<T> obs;
if (_observer.TryGetTarget(out obs))
{
obs.OnNext(value);
}
else
{
Exchange(ref _disposable, null)?.Dispose();
}
}
public void OnError(Exception error)
{
if (error == null) throw new ArgumentNullException(nameof(error));
IObserver<T> obs;
if (_observer.TryGetTarget(out obs))
{
obs.OnError(error);
}
else
{
Exchange(ref _disposable, null)?.Dispose();
}
}
public void OnCompleted()
{
IObserver<T> obs;
if (_observer.TryGetTarget(out obs))
{
obs.OnCompleted();
}
else
{
Exchange(ref _disposable, null)?.Dispose();
}
}
}
}
こいつが何をしてるかというと、ラップしてる元のIObserverを弱参照化させて、Subscribeされた結果が強参照化しないようにしてる。
また、IDisposableを保持しており、参照先が消えて無くなったら、Subscriptionを開放するようになってる。
また、保持してたIDisposableオブジェクトも、一度Disposeメソッドを読んだ後さっさと開放するようにしてある。
で、こいつを元に、以下のような拡張メソッドをこさえた。
public static class WeakSubscriber
{
public static IDisposable WeakSubscribe<T>(this IObservable<T> observable, IObserver<T> observer)
{
if (observable == null) throw new ArgumentNullException(nameof(observable));
if (observer == null) throw new ArgumentNullException(nameof(observer));
var weak = new WeakObserver<T>(observer);
var token = observable.Subscribe(weak);
weak.SetDisposable(token);
return token;
}
}
こっちは比較的単純。IObservableのSubscriptionの引数に、先のWeakObserverを渡して、貰ってきたIDisposableを設定したら元のIDisposableを返しているだけ。
積み残し
こんなの作っておけば、概ね便利に使える反面、以下の問題は積み残している。
- On~~が呼ばれないと開放されない
- そもそも、On~~の中でDisposeするって言う少し行儀の悪いことをしてる
- コンシューマーが検知できない形でDisposeメソッドが複数回呼ばれる可能性がある
1.に関しちゃ、重くなるけど、内部でタイマなり走らせて、生存確認することで対応出来るとは思う。
3.にかんしては、Disposeメソッドを複数回呼ぶとマズい設計はあんまり良くないと思う(と逃げておくw)
2.が結構問題で、この辺割り切らないと駄目かなぁと思いました。
最後に、ロックフリーで、CompareExchange使っていた部分で、Exchange使っても同じ事が出来ると教えてくださった、@Temarinさん有り難うございました!