概要
普通に中気になりますよね?(自分だけですかね?)
またトラブルが有ると中身知りたくなってきますよね?
だいたいどんな仕組みなのかわかるといいことがあるかもと思いみてみました。
(正確ではない簡易クラス図と簡易シーケンス図を利用させていただいています。)
何か間違いなどございましたらご指摘お願いします。
最初にまとめ
中身について公式にドキュメントなどがあるか分からないですが個人的に見てみて、
以下の3フェーズに分かれていそうだと感じました。
1. Observable構築フェーズ → subscribeを呼ぶまでに準備として構造を構築
2. subscribeフェーズ → 実際に実行されていくまでにそれぞれが購読していく
3. 実行フェーズ → 購読しているものに結果を渡していく
例とするプログラムについて
今回はこのプログラムに対して見ていくことで理解していきます。
以下のプログラムを実行すると"Hello,World"が出力されます。
Single.createでSingleが作成され、Schedulerにより実行スレッドが決められ、mapオペレータで文字列が変換され、subscribeで結果を受け取ります。
.subscribeOn(Schedulers.trampoline())は即時実行という意味です。(テストとか用)
Singleは一個だけ値を流すObservableだと思ってください。
Single
.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> e) throws Exception {
e.onSuccess("Hello");
}
})
.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.trampoline())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s + ",World";
}
})
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String value) {
System.out.println(value)
}
@Override
public void onError(Throwable e) {
}
});
Observable(Single)構築フェーズ
まずsubscribe()を行う前にObservableの構造を作成します。
map()まで呼び出した状態で、subscribe()を呼ぶ前に以下のような構造が作られます。SingleMapがmap()の返り値となります。
sourceというメンバ変数で記述上で下のほうのオペレーターからの参照が作成されます。
最後のTest$3はe.onSuccess("Hello")を呼び出しているメソッドです。
またmapで実際に実行するメソッド(Test$2)やSchedulerなども持っています。
このSingleMapやSingleObserveOnなどはすべてSingleを継承しているようです。
subscribeフェーズ
アプリケーションのメソッドでSingleMapのsubscribe()メソッドが呼ばれてからSingle.createで渡しているSingleOnSubscribeが実行されるまでをそう呼ぶことにします。
subscribeというのは購読という意味があります。
SingleMapからSingleCreateに至るまでsubscribeを呼んで購読していきます。
基本的に先程Observableの構築フェーズでできたsourceというメンバ変数に対して、subscribeメソッドを呼んでいくという流れです。
SingleSubscribeOnだけは少し違い、subscribeを呼ぶスレッドをここで変更するようです。(trampolineなのでそのまま実行しているっぽい?)
subscribe()では引数として、Observerをそれぞれ作成して渡していたのでこのように構造が作られました。
Test$1はSystem.out.println()を行うObserverです。
またそれぞれのオブジェクトからSchedulerやMapperにアクセスする手段が作られています。
これで上から実行していく環境が整ったようです。
参考: SingleMap内の実際のコード(subscribeでObserverを渡す)
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new SingleObserver<T>() {
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
v = mapper.apply(value);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
});
}
実行フェーズ
コードの表記上の上からの実行の流れです。実際にアイテムが流れていくことろです。
subscribeフェーズでできたデータ構造により、actualというメンバ変数に対してonSuccess()を呼び出していくことで実行されます。
いくつか特徴がありそうです。
- subscribeのときと逆にSubscribeOnObserverは素通りして、subscribe時に作っていたschedulerを利用してObserveOnSingleObserverはスレッドを切り替えてくれる
- Observable構築時に作っていたmappperにアクセスすることで、mapする
感想
結構複雑だと思っていたのですが、
データ構造に対してクラス図で、処理の流れをシーケンス図で見ていくことでなんとなくの処理の流れを見ていくことができました。
SingleでなくObservableでも多少確認してみましたが同じような流れのようです。