RxJava
RxJava2

RxJava 勉強会 Week4

More than 1 year has passed since last update.

RxJava2 の実装について


内容

簡単なサンプルコードを使って

  • Observable.create()
  • Observable.subscribe()

の実装を説明をします。

元ネタ:http://qiita.com/Shiozawa/items/0ccdb43d803de459c6f4


サンプルコード


URL

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week4/Week4.java


サンプルコード

// Observable の作成
Observable<Integer> observable =
    Observable.create(e -> {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    });

// Observable のアイテムをコンソールに出力
observable.subscribe(
    System.out::println,
    System.out::println,
    System.out::println,
    System.out::println
);

サンプルコードを展開

Observable<Integer> observable =
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override 
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
       }
    });

サンプルコードを展開

observable.subscribe(new Observer<Integer>() {
    @Override 
    public void onSubscribe(Disposable d) {
        System.out.println(d);
    }
    @Override 
    public void onNext(Integer integer) {
        System.out.println(integer);
    }
    @Override 
    public void onError(Throwable e) {
        System.out.println(e);
    }
    @Override 
    public void onComplete() {
       // 何も出力しない
       System.out.println();
    }
});

登場人物


Observable の生成

  • Observable<T> クラス
  • Observable.create() スタティックメソッド
  • ObservableOnSubscribe<T> インタフェース
  • ObservableEmitter<T> インタフェース

Observer の生成と subscribe

  • Observer<T> インタフェース
  • Observable.subscribe() インスタンスメソッド

Observable クラス


2つの役割

  • Observable インスタンスを生成するための static ファクトリーメソッド
  • Observable のインスタンスメソッド

static ファクトリーメソッド


インスタンスメソッド

  • subscribe() やメソッドチェーン
  • それぞれのインスタンスメソッドごとに、Observable を継承したクラスがあり、それを返している
    • map() は ObservableMap 返す

Observable.create()

  • Observable の static ファクトリーメソッドの一つ

Observable.create() ソース

Observable.create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
  • RxJavaPlugin.onAssembly は通常何もしない
    • テストの実行時のインジェクトなどに利用
  • ObservableCreate を生成して返す

ObservableCreate クラス


特徴

  • Observable を継承している
  • Observable の abstract メソッドである subscribeActual() を実装している
  • source プロパティとして ObservableOnSubscribe への参照を持っている

UML


ここまでのまとめ


サンプルコード

Observable<Integer> observable =
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override 
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
       }
    });

やってること

  • Observable.create()
    • Observable.create() は static ファクトリーメソッド
    • Observable を継承した ObservableCreate インスタンスを生成
  • ObservableCreate クラス
    • Observable を継承
    • 渡された ObservableOnSubscribe インスタンスへの参照を保持

Observable.subscribe()


サンプルコード

observable.subscribe(new Observer<Integer>() {
    @Override 
    public void onSubscribe(Disposable d) {
        System.out.println(d);
    }
    @Override 
    public void onNext(Integer integer) {
        System.out.println(integer);
    }
    @Override 
    public void onError(Throwable e) {
        System.out.println(e);
    }
    @Override 
    public void onComplete() {
       // 何も出力しない
       System.out.println();
    }
});

Observable.subscribe() コード

Observable.subscribe()
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

subscribeActual()observer を渡しているだけ


Observable.subscribeActual() ソース

Observable.subscribeActual()
protected abstract void subscribeActual(Observer<? super T> observer);

⇒ abstract メソッド


ObservableCreate.subscribeActual() ソース

ObservableCreate.subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
       // source は ObservableCreate のプロパティ
       // (final ObservableOnSubscribe<T> source;)
       source.subscribe(parent);
    } catch (Throwable ex) {
       Exceptions.throwIfFatal(ex);
       parent.onError(ex);
    }
}

ObservableCreate.subscribeActula() やっていること

  • CreateEmitter<T> インスタンスを作成
    • observer を渡す
  • Observer インスタンスの onSubscribe() を呼び出す
  • source プロパティの subscribe()CreateEmitter<T> を与えて呼び出す

ObservableCreate.CreateEmitter クラス

  • ObservableEmitter<T> を実装している
    • ObservableEmitter<T>Emitter<T> を継承している
  • observer プロパティとして Observer への参照を保持している

UML


(再掲)ObservableCreate.subscribeActual() ソース

ObservableCreate.subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
       // source は ObservableCreate のプロパティ
       // (final ObservableOnSubscribe<T> source;)
       source.subscribe(parent);
    } catch (Throwable ex) {
       Exceptions.throwIfFatal(ex);
       parent.onError(ex);
    }
}

source.subscribe()

ObservableOnSubscribeの無名実装
new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
}

CreateEmitter.onNext()

CreateEmitter.onNext()
@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        // observer は CreateEmitter のプロパティ
        // (final Observer<? super T> observer;)
        observer.onNext(t);
    }
}

⇒ onNext をただ呼び出しているだけ


ここまでのまとめ

  • Observer.subscribe()
    • subscribeActual() を呼び出す
    • subscribeActual() は abstract メソッド
    • 今回のケースでは、実装しているのは ObservableCreateObservableCreate.subscribeActual() が呼び出される
  • ObservableCreate.subscribeActual()
    • CreateEmitter<T> を生成
    • CreateEmitter<T> は渡された Observer をラップ
    • CreateEmitter<T>.onNext() が呼び出されると、Observer.onNext() がそのまま呼び出される

わかったこと1

Observable クラスには二つの役割がある。

  • Observable インスタンスを生成するための static ファクトリーメソッドを提供
  • Observable のインスタンスメソッドを提供

わかったこと2

Observable のメソッドは、基本的に Observable の派生クラスを生成して返す。

  • 例1:Observable.create()ObservableCreate を生成
  • 例2:Observable.map()ObservableMap を生成

わかったこと3

Observable.subscribe()Observable.subscribeActual() を呼び出している。

  • subscribeActual() は abstract メソッド
  • Observable を継承したクラスが性質に応じて subscribeActual() を実装

わかったこと4

ObservableCreate.subscribeActual() は次の処理を行っている。

  • Observer をラップするの CreateEmitter<T> を生成
  • ObservableOnSubscribe.subscribe() にその CreateEmitter<T> インスタンスを渡す
  • CreateEmitter<T>onNext()ObserveronNext() に転送

わかったこと5

このケースでは subscribe() をきっかけとして Observable が動き出す

  • ⇒ Cold な Observable である

次回


次回

  • map() などを使ったメソッドチェーンの実装について
  • Scheduler 指定の実装について(tranpoline の効果が分かるかも?)
  • Emitter を動的(?)にする方法(毎回 Observable を作らない)