Edited at

RxJava 勉強会 Week4

More than 1 year has passed since last update.


RxJava2 の実装について



内容

簡単なサンプルコードを使って


  • Observable.create()

  • Observable.subscribe()

の実装を説明をします。

元ネタ:http://qiita.com/Shiozawa/items/0ccdb43d803de459c6f4



サンプルコード



URL

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week4/Week4.java



サンプルコード

// Observable の作成

Observable<Integer> observable =
Observable.create(e -> {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
});

// Observable のアイテムをコンソールに出力
observable.subscribe(
System.out::println,
System.out::println,
System.out::println,
System.out::println
);



サンプルコードを展開

Observable<Integer> observable =

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});



サンプルコードを展開

observable.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
System.out.println(d);
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onComplete() {
// 何も出力しない
System.out.println();
}
});



登場人物



Observable の生成



  • Observable<T> クラス


  • Observable.create() スタティックメソッド


  • ObservableOnSubscribe<T> インタフェース


  • ObservableEmitter<T> インタフェース



Observer の生成と subscribe



  • Observer<T> インタフェース


  • Observable.subscribe() インスタンスメソッド



Observable クラス



2つの役割



  • Observable インスタンスを生成するための static ファクトリーメソッド


  • Observable のインスタンスメソッド



static ファクトリーメソッド



インスタンスメソッド


  • subscribe() やメソッドチェーン

  • それぞれのインスタンスメソッドごとに、Observable を継承したクラスがあり、それを返している


    • map() は ObservableMap 返す





Observable.create()


  • Observable の static ファクトリーメソッドの一つ



Observable.create() ソース


Observable.create()

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {

ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}


  • RxJavaPlugin.onAssembly は通常何もしない


    • テストの実行時のインジェクトなどに利用



  • ObservableCreate を生成して返す



ObservableCreate クラス



特徴



  • Observable を継承している


  • Observable の abstract メソッドである subscribeActual() を実装している


  • source プロパティとして ObservableOnSubscribe への参照を持っている



UML



ここまでのまとめ



サンプルコード

Observable<Integer> observable =

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});



やってること



  • Observable.create()



    • Observable.create() は static ファクトリーメソッド


    • Observable を継承した ObservableCreate インスタンスを生成




  • ObservableCreate クラス


    • Observable を継承

    • 渡された ObservableOnSubscribe インスタンスへの参照を保持





Observable.subscribe()



サンプルコード

observable.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
System.out.println(d);
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onComplete() {
// 何も出力しない
System.out.println();
}
});



Observable.subscribe() コード


Observable.subscribe()

public final void subscribe(Observer<? super T> observer) {

ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}


subscribeActual()observer を渡しているだけ



Observable.subscribeActual() ソース


Observable.subscribeActual()

protected abstract void subscribeActual(Observer<? super T> observer);


⇒ abstract メソッド



ObservableCreate.subscribeActual() ソース


ObservableCreate.subscribeActual()

protected void subscribeActual(Observer<? super T> observer) {

CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

try {
// source は ObservableCreate のプロパティ
// (final ObservableOnSubscribe<T> source;)
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}




ObservableCreate.subscribeActula() やっていること



  • CreateEmitter<T> インスタンスを作成



    • observer を渡す




  • Observer インスタンスの onSubscribe() を呼び出す


  • source プロパティの subscribe()CreateEmitter<T> を与えて呼び出す



ObservableCreate.CreateEmitter クラス



  • ObservableEmitter<T> を実装している



    • ObservableEmitter<T>Emitter<T> を継承している




  • observer プロパティとして Observer への参照を保持している



UML



(再掲)ObservableCreate.subscribeActual() ソース


ObservableCreate.subscribeActual()

protected void subscribeActual(Observer<? super T> observer) {

CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

try {
// source は ObservableCreate のプロパティ
// (final ObservableOnSubscribe<T> source;)
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}




source.subscribe()


ObservableOnSubscribeの無名実装

new ObservableOnSubscribe<Integer>() {

@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}



CreateEmitter.onNext()


CreateEmitter.onNext()

@Override

public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
// observer は CreateEmitter のプロパティ
// (final Observer<? super T> observer;)
observer.onNext(t);
}
}

⇒ onNext をただ呼び出しているだけ



ここまでのまとめ



  • Observer.subscribe()



    • subscribeActual() を呼び出す


    • subscribeActual() は abstract メソッド

    • 今回のケースでは、実装しているのは ObservableCreateObservableCreate.subscribeActual() が呼び出される




  • ObservableCreate.subscribeActual()



    • CreateEmitter<T> を生成


    • CreateEmitter<T> は渡された Observer をラップ


    • CreateEmitter<T>.onNext() が呼び出されると、Observer.onNext() がそのまま呼び出される





わかったこと1

Observable クラスには二つの役割がある。



  • Observable インスタンスを生成するための static ファクトリーメソッドを提供


  • Observable のインスタンスメソッドを提供



わかったこと2

Observable のメソッドは、基本的に Observable の派生クラスを生成して返す。


  • 例1:Observable.create()ObservableCreate を生成

  • 例2:Observable.map()ObservableMap を生成



わかったこと3

Observable.subscribe()Observable.subscribeActual() を呼び出している。



  • subscribeActual() は abstract メソッド


  • Observable を継承したクラスが性質に応じて subscribeActual() を実装



わかったこと4

ObservableCreate.subscribeActual() は次の処理を行っている。



  • Observer をラップするの CreateEmitter<T> を生成


  • ObservableOnSubscribe.subscribe() にその CreateEmitter<T> インスタンスを渡す


  • CreateEmitter<T>onNext()ObserveronNext() に転送



わかったこと5

このケースでは subscribe() をきっかけとして Observable が動き出す


  • ⇒ Cold な Observable である



次回



次回


  • map() などを使ったメソッドチェーンの実装について

  • Scheduler 指定の実装について(tranpoline の効果が分かるかも?)

  • Emitter を動的(?)にする方法(毎回 Observable を作らない)