LoginSignup
6
5

More than 5 years have passed since last update.

WeakなSubscribe

Last updated at Posted at 2017-01-04

どーいうことか?

ReactiveExtensionsにおいて、Iobservable.SubscribeはIDisposableを戻り値となるから、こいつを使っていにしえから有るEventよっか相当ましなリソース制御が出来るようにゃなった。
けど、状況によっちゃDispose出来るタイミングが無かったりすることがあるので、Subscribeの結果が強参照を持たない様に何とかしてみたので、おつきあい頂ければ幸い。

この問題のゴール

今回、以下のような目標が有ると言うことで一つ。

  1. Subscribeの結果、IObservableにIObserverの強参照を持たせない。
  2. IObserverが開放されたら自動的にDisposeを実行して、サブスクライブの自動開放を行う。
  3. ↑のタイミングはOnNext,OnCompleted,OnErrorの呼ばれたタイミングを使う。

で何をしたのか

何をしたかってーと、WeakSubscribeメソッドと言う拡張メソッドを作成した。まぁ、その中で一服盛ってる。
内部に、WeakObserverを抱えており、こいつが結構キモとなる。以下実装


public static class WeakSubscriber
{
    private class WeakObserver<T> : IObserver<T>
    {

        private readonly WeakReference<IObserver<T>> _observer;
        private IDisposable _disposable;
        public WeakObserver(IObserver<T> observer)
        {
            if (observer == null) throw new ArgumentNullException(nameof(observer));
            _observer = new WeakReference<IObserver<T>>(observer);
        }

        //This method is not protected from multi thread.
        public void SetDisposable(IDisposable disposable)
        {
            if (disposable == null) throw new ArgumentNullException(nameof(disposable));
            if(_disposable!=null) throw new InvalidOperationException();

            _disposable = disposable;

        }

        public void OnNext(T value)
        {
            IObserver<T> obs;

            if (_observer.TryGetTarget(out obs))
            {
                obs.OnNext(value);
            }
            else
            {
                Exchange(ref _disposable, null)?.Dispose();
            }

        }

        public void OnError(Exception error)
        {
            if (error == null) throw new ArgumentNullException(nameof(error));


            IObserver<T> obs;

            if (_observer.TryGetTarget(out obs))
            {
                obs.OnError(error);
            }
            else
            {
                Exchange(ref _disposable, null)?.Dispose();
            }

        }

        public void OnCompleted()
        {
            IObserver<T> obs;

            if (_observer.TryGetTarget(out obs))
            {
                obs.OnCompleted();
            }
            else
            {
                Exchange(ref _disposable, null)?.Dispose();
            }
        }

    }
}

こいつが何をしてるかというと、ラップしてる元のIObserverを弱参照化させて、Subscribeされた結果が強参照化しないようにしてる。
また、IDisposableを保持しており、参照先が消えて無くなったら、Subscriptionを開放するようになってる。

また、保持してたIDisposableオブジェクトも、一度Disposeメソッドを読んだ後さっさと開放するようにしてある。

で、こいつを元に、以下のような拡張メソッドをこさえた。


public static class WeakSubscriber
{


    public static IDisposable WeakSubscribe<T>(this IObservable<T> observable, IObserver<T> observer)
    {
        if (observable == null) throw new ArgumentNullException(nameof(observable));
        if (observer == null) throw new ArgumentNullException(nameof(observer));

        var weak = new WeakObserver<T>(observer);

        var token = observable.Subscribe(weak);
        weak.SetDisposable(token);

        return token;

    }
}

こっちは比較的単純。IObservableのSubscriptionの引数に、先のWeakObserverを渡して、貰ってきたIDisposableを設定したら元のIDisposableを返しているだけ。

積み残し

こんなの作っておけば、概ね便利に使える反面、以下の問題は積み残している。

  1. On~~が呼ばれないと開放されない
  2. そもそも、On~~の中でDisposeするって言う少し行儀の悪いことをしてる
  3. コンシューマーが検知できない形でDisposeメソッドが複数回呼ばれる可能性がある

1.に関しちゃ、重くなるけど、内部でタイマなり走らせて、生存確認することで対応出来るとは思う。
3.にかんしては、Disposeメソッドを複数回呼ぶとマズい設計はあんまり良くないと思う(と逃げておくw)
2.が結構問題で、この辺割り切らないと駄目かなぁと思いました。

最後に、ロックフリーで、CompareExchange使っていた部分で、Exchange使っても同じ事が出来ると教えてくださった、@Temarinさん有り難うございました!

6
5
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
5