LoginSignup
52

More than 5 years have passed since last update.

RxJava(2.0)の仕組みを理解したいメモ

Last updated at Posted at 2016-11-13

概要

普通に中気になりますよね?(自分だけですかね?)
またトラブルが有ると中身知りたくなってきますよね?
だいたいどんな仕組みなのかわかるといいことがあるかもと思いみてみました。
(正確ではない簡易クラス図と簡易シーケンス図を利用させていただいています。)

何か間違いなどございましたらご指摘お願いします。

最初にまとめ

中身について公式にドキュメントなどがあるか分からないですが個人的に見てみて、
以下の3フェーズに分かれていそうだと感じました。
1. Observable構築フェーズ → subscribeを呼ぶまでに準備として構造を構築
2. subscribeフェーズ → 実際に実行されていくまでにそれぞれが購読していく
3. 実行フェーズ → 購読しているものに結果を渡していく

例とするプログラムについて

今回はこのプログラムに対して見ていくことで理解していきます。

以下のプログラムを実行すると"Hello,World"が出力されます。
Single.createでSingleが作成され、Schedulerにより実行スレッドが決められ、mapオペレータで文字列が変換され、subscribeで結果を受け取ります。

.subscribeOn(Schedulers.trampoline())は即時実行という意味です。(テストとか用)
Singleは一個だけ値を流すObservableだと思ってください。

Single
    .create(new SingleOnSubscribe<String>() {
        @Override
        public void subscribe(SingleEmitter<String> e) throws Exception {
            e.onSuccess("Hello");
        }
    })
    .subscribeOn(Schedulers.trampoline())
    .observeOn(Schedulers.trampoline())
    .map(new Function<String, String>() {
        @Override
        public String apply(String s) throws Exception {
            return s + ",World";
        }
    })
    .subscribe(new SingleObserver<String>() {
        @Override
        public void onSubscribe(Disposable d) {
        }
        @Override
        public void onSuccess(String value) {
            System.out.println(value)
        }
        @Override
        public void onError(Throwable e) {
        }
    });

Observable(Single)構築フェーズ

まずsubscribe()を行う前にObservableの構造を作成します。

map()まで呼び出した状態で、subscribe()を呼ぶ前に以下のような構造が作られます。SingleMapがmap()の返り値となります。
image
image

sourceというメンバ変数で記述上で下のほうのオペレーターからの参照が作成されます
最後のTest$3はe.onSuccess("Hello")を呼び出しているメソッドです。
またmapで実際に実行するメソッド(Test$2)やSchedulerなども持っています

このSingleMapやSingleObserveOnなどはすべてSingleを継承しているようです。

subscribeフェーズ

アプリケーションのメソッドでSingleMapのsubscribe()メソッドが呼ばれてからSingle.createで渡しているSingleOnSubscribeが実行されるまでをそう呼ぶことにします。
subscribeというのは購読という意味があります。
SingleMapからSingleCreateに至るまでsubscribeを呼んで購読していきます。
基本的に先程Observableの構築フェーズでできたsourceというメンバ変数に対して、subscribeメソッドを呼んでいくという流れです。
SingleSubscribeOnだけは少し違い、subscribeを呼ぶスレッドをここで変更するようです。(trampolineなのでそのまま実行しているっぽい?)

image

ここでまた新しい構造が作られます。
image
image

subscribe()では引数として、Observerをそれぞれ作成して渡していたのでこのように構造が作られました。
Test$1はSystem.out.println()を行うObserverです。
またそれぞれのオブジェクトからSchedulerやMapperにアクセスする手段が作られています
これで上から実行していく環境が整ったようです。

参考: SingleMap内の実際のコード(subscribeでObserverを渡す)

    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new SingleObserver<T>() {
            @Override
            public void onSubscribe(Disposable d) {
                t.onSubscribe(d);
            }

            @Override
            public void onSuccess(T value) {
                R v;
                try {
                    v = mapper.apply(value);
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    onError(e);
                    return;
                }

                t.onSuccess(v);
            }

            @Override
            public void onError(Throwable e) {
                t.onError(e);
            }
        });
    }

実行フェーズ

コードの表記上の上からの実行の流れです。実際にアイテムが流れていくことろです。

subscribeフェーズでできたデータ構造により、actualというメンバ変数に対してonSuccess()を呼び出していくことで実行されます。
image

いくつか特徴がありそうです。

  1. subscribeのときと逆にSubscribeOnObserverは素通りして、subscribe時に作っていたschedulerを利用してObserveOnSingleObserverはスレッドを切り替えてくれる
  2. Observable構築時に作っていたmappperにアクセスすることで、mapする

感想

結構複雑だと思っていたのですが、
データ構造に対してクラス図で、処理の流れをシーケンス図で見ていくことでなんとなくの処理の流れを見ていくことができました。
SingleでなくObservableでも多少確認してみましたが同じような流れのようです。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
52