RxJava2 の実装について
内容
簡単なサンプルコードを使って
- Observable.create()
- Observable.subscribe()
の実装を説明をします。
元ネタ:http://qiita.com/Shiozawa/items/0ccdb43d803de459c6f4
サンプルコード
URL
サンプルコード
// Observable の作成
Observable<Integer> observable =
Observable.create(e -> {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
});
// Observable のアイテムをコンソールに出力
observable.subscribe(
System.out::println,
System.out::println,
System.out::println,
System.out::println
);
サンプルコードを展開
Observable<Integer> observable =
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
サンプルコードを展開
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(d);
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onComplete() {
// 何も出力しない
System.out.println();
}
});
登場人物
Observable の生成
-
Observable<T>
クラス -
Observable.create()
スタティックメソッド -
ObservableOnSubscribe<T>
インタフェース -
ObservableEmitter<T>
インタフェース
Observer の生成と subscribe
-
Observer<T>
インタフェース -
Observable.subscribe()
インスタンスメソッド
Observable クラス
2つの役割
-
Observable
インスタンスを生成するための static ファクトリーメソッド -
Observable
のインスタンスメソッド
static ファクトリーメソッド
- 任意の Observable を作成するメソッド
- A Decision Tree of Observable Operators
インスタンスメソッド
- subscribe() やメソッドチェーン
- それぞれのインスタンスメソッドごとに、Observable を継承したクラスがあり、それを返している
- map() は ObservableMap 返す
Observable.create()
- Observable の static ファクトリーメソッドの一つ
Observable.create() ソース
Observable.create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
- RxJavaPlugin.onAssembly は通常何もしない
- テストの実行時のインジェクトなどに利用
- ObservableCreate を生成して返す
ObservableCreate クラス
特徴
-
Observable
を継承している -
Observable
の abstract メソッドであるsubscribeActual()
を実装している -
source
プロパティとしてObservableOnSubscribe
への参照を持っている
UML
ここまでのまとめ
サンプルコード
Observable<Integer> observable =
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
やってること
-
Observable.create()
-
Observable.create()
は static ファクトリーメソッド -
Observable
を継承したObservableCreate
インスタンスを生成
-
-
ObservableCreate
クラス-
Observable
を継承 - 渡された
ObservableOnSubscribe
インスタンスへの参照を保持
-
Observable.subscribe()
サンプルコード
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(d);
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onComplete() {
// 何も出力しない
System.out.println();
}
});
Observable.subscribe() コード
Observable.subscribe()
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
⇒ subscribeActual()
に observer
を渡しているだけ
Observable.subscribeActual() ソース
Observable.subscribeActual()
protected abstract void subscribeActual(Observer<? super T> observer);
⇒ abstract メソッド
ObservableCreate.subscribeActual() ソース
ObservableCreate.subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
// source は ObservableCreate のプロパティ
// (final ObservableOnSubscribe<T> source;)
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
ObservableCreate.subscribeActula() やっていること
-
CreateEmitter<T>
インスタンスを作成-
observer
を渡す
-
-
Observer
インスタンスのonSubscribe()
を呼び出す -
source
プロパティのsubscribe()
にCreateEmitter<T>
を与えて呼び出す
ObservableCreate.CreateEmitter クラス
-
ObservableEmitter<T>
を実装している-
ObservableEmitter<T>
はEmitter<T>
を継承している
-
-
observer
プロパティとしてObserver
への参照を保持している
UML
(再掲)ObservableCreate.subscribeActual() ソース
ObservableCreate.subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
// source は ObservableCreate のプロパティ
// (final ObservableOnSubscribe<T> source;)
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
source.subscribe()
ObservableOnSubscribeの無名実装
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}
CreateEmitter.onNext()
CreateEmitter.onNext()
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
// observer は CreateEmitter のプロパティ
// (final Observer<? super T> observer;)
observer.onNext(t);
}
}
⇒ onNext をただ呼び出しているだけ
ここまでのまとめ
-
Observer.subscribe()
-
subscribeActual()
を呼び出す -
subscribeActual()
は abstract メソッド - 今回のケースでは、実装しているのは
ObservableCreate
でObservableCreate.subscribeActual()
が呼び出される
-
-
ObservableCreate.subscribeActual()
-
CreateEmitter<T>
を生成 -
CreateEmitter<T>
は渡されたObserver
をラップ -
CreateEmitter<T>.onNext()
が呼び出されると、Observer.onNext()
がそのまま呼び出される
-
わかったこと1
Observable
クラスには二つの役割がある。
-
Observable
インスタンスを生成するための static ファクトリーメソッドを提供 -
Observable
のインスタンスメソッドを提供
わかったこと2
Observable
のメソッドは、基本的に Observable
の派生クラスを生成して返す。
- 例1:
Observable.create()
はObservableCreate
を生成 - 例2:
Observable.map()
はObservableMap
を生成
わかったこと3
Observable.subscribe()
は Observable.subscribeActual()
を呼び出している。
-
subscribeActual()
は abstract メソッド -
Observable
を継承したクラスが性質に応じてsubscribeActual()
を実装
わかったこと4
ObservableCreate.subscribeActual()
は次の処理を行っている。
-
Observer
をラップするのCreateEmitter<T>
を生成 -
ObservableOnSubscribe.subscribe()
にそのCreateEmitter<T>
インスタンスを渡す -
CreateEmitter<T>
のonNext()
はObserver
のonNext()
に転送
わかったこと5
このケースでは subscribe() をきっかけとして Observable が動き出す
- ⇒ Cold な Observable である
次回
次回
- map() などを使ったメソッドチェーンの実装について
- Scheduler 指定の実装について(tranpoline の効果が分かるかも?)
- Emitter を動的(?)にする方法(毎回 Observable を作らない)