10
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

HotなIObservableとは何なのかソースコードを読んで理解してみる

Last updated at Posted at 2017-02-03

HotなIObservableとは何なのかソースコードを読んで理解してみる

RxのHotなIObservableとはいったい何をやっているのかソースコードを読んで理解してみます。

たぶん、概念的な話とかサンプルとかで理解できる人はこの辺の記事読めばいいと思います。

Rx入門 (14) - Cold to Hot変換

これを読んでなんでSubjectはHotなのよ?と思ったあなた
そこのあなたのための記事になります。

で本家、Rxのコードも読んだんですがちょっと複雑だったので実装が簡単なUniRxのコードを読みながらHotなIObservableってデータ構造的にどうなってるのよ?という話をしていきます。

ColdなIObservableとは何なのかソースコードを読んで理解してみる
続編を書きました。こちらのほうがSubscribeって何よ?という話を含んでいるので先に読むのも良いかもしれません。

HotなIObservableの作り方

HotなIObservableを作る方法はSubjectを作るかPublish系のメソッドを挟むかの二つです。
Publish系メソッドは入ってきたものをConnectableObservableに入れて返すメソッドです。
ということでConnectableObservableを見ていきます。

Publish抜粋
        public static IConnectableObservable<T> Multicast<T>(this IObservable<T> source, ISubject<T> subject)
        {
            return new ConnectableObservable<T>(source, subject);
        }

        public static IConnectableObservable<T> Publish<T>(this IObservable<T> source)
        {
            return source.Multicast(new Subject<T>());
        }

以下がConnectableObservableの実装です。

ConnectableObservable抜粋
    public static partial class Observable
    {
        class ConnectableObservable<T> : IConnectableObservable<T>
        {
            readonly IObservable<T> source;
            readonly ISubject<T> subject;
            readonly object gate = new object();
            Connection connection;

            public ConnectableObservable(IObservable<T> source, ISubject<T> subject)
            {
                this.source = source.AsObservable();
                this.subject = subject;
            }

            public IDisposable Connect()
            {
                lock (gate)
                {
                    // don't subscribe twice
                    if (connection == null)
                    {
                        var subscription = source.Subscribe(subject);
                        connection = new Connection(this, subscription);
                    }

                    return connection;
                }
            }

            public IDisposable Subscribe(IObserver<T> observer)
            {
                return subject.Subscribe(observer);
            }

コンストラクタは代入してるだけ、SubscribeはSubjectに登録しているだけです。
ですので、このままでは登録したobserverとデータが流れてくるsourceはつながっていません。
そこでConnectメソッドを使ってつなげます。

ちなみにConnectの必要性なんですけれどないとこれがないとColdなIObservableから入ってきたデータはSubscribeされた瞬間にすべて流れてしまうので必要になっています。

connectionはこのつながりをIDisposableで切るためのものなので置いておいて、結局やっているのは「source.Subscribe(subject)」だけです。
この瞬間にsourceが例えばObservable.Rangeとかだった場合はデータが流れ込むわけです。
Subjectがデータの源流だったらConnectが呼ばれるまでは垂れ流しになります。

というわけでHotなIObservableっていうのは結局SubjectになるわけなのでSubjectの中身を読めばHotなIObservableがなんなのかがわかるわけですね。
というわけでSubjectの中身を読んでみます。

Subjectの挙動

というわけでSubjectの挙動を見ていきましょう。

SubjectのSubscribeの部分はこうなってます。

subject抜粋
        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null) throw new ArgumentNullException("observer");

            var ex = default(Exception);

            lock (observerLock)
            {
                ThrowIfDisposed();
                if (!isStopped)
                {
                    var listObserver = outObserver as ListObserver<T>;
                    if (listObserver != null)
                    {
                        outObserver = listObserver.Add(observer);
                    }
                    else
                    {
                        var current = outObserver;
                        if (current is EmptyObserver<T>)
                        {
                            outObserver = observer;
                        }
                        else
                        {
                            outObserver = new ListObserver<T>(new ImmutableList<IObserver<T>>(new[] { current, observer }));
                        }
                    }

                    return new Subscription(this, observer);
                }

                ex = lastError;
            }

var listObserver = outObserver as ListObserver<T>;以下を読んでいきます。
outObserverはIObserver outObserver = EmptyObserver<T>.Instance;と定義されていています。
また、このoutObserverに対してOnNext,OnComplete,OnErrorが発行されます。(詳細は中身読んでください。)
ここでは重要部分だけ読みます。

以下このメソッドが何度か呼ばれたときにoutObserverがどうなっていくかを見ていきます。

一つ目のobserverが入ってきたとき、outObserverはEmptyObserver<T>なので
outObserver = observer;となります。
つまりこの状態だとそのままobserverにデータを流します。

二つ目のobserverが入ってきたときoutObserverは以下のコードを通ります。
outObserver = new ListObserver(new ImmutableList>(new[] { current, observer }));
ListObserverは名前からしてオブザーバーのリストです。実装的にはOnNextとかはforで回しているだけです。

三つ目以降のobserverが入ってきたときはoutObserverはListObserverなはずなのでoutObserver = listObserver.Add(observer);になります。
つまり追加されるわけです。

要するにListObserverというやつがSubjectのメインでOnNextで入力されたデータをListObserverに受け渡す。それをListObserverがそれぞれの登録されたobserverにデータを改めて流すということのようです。
ほかのオブザーバーはこのようにリストで保持するということをしません。
証拠にListObserverの参照を検索してもらえれば、Subject系以外が参照していないのがわかるでしょう。

ではどうしているかといえば入力のobserverに直接OnNextにデータを流したり(処理した後に流したり)します。Subscribeメソッドでのみobserverは参照されます。IObserverをメンバとして握るということはほとんどしません。
一方でSubjectはSubscribeされた時点ではどういう入力が来るかわからないのでちゃんとにぎにぎします。

結論

要はHotっていうのはデータを流す先のobserverを握っていて自分に流れてきたデータを振り分ける機構のことでそれを持っているのがSubjectってわけね。
これを使って枝分かれできるわけだ。

あと決定的に違うのはSubjectはsubscribeするときに自分自身にobserverを接続するような動きをするけれど、
ほかのオペレータは直接値を流すかさもなければ自分の前のIObservableと後ろのIObserverをつなげることしかせず仕事が終わったら後ろのIObserverは一切触らないということかな。

個人的にはこれでとてもしっくり来たんだけれどどうでしょうか?

10
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?