RxJava
RxJava2

RxJava2 ソースコードリーディング(1) - Observable.create() と Observable.subscribe() -

More than 1 year has passed since last update.

この記事は何?

単純なサンプルコードを使って RxJava2 の実装について説明しています。
ReactiveX の基本的な概念や RxJava の使い方をすでに知っている方を対象としています。

バージョン

本記事で RxJava と書く場合は RxJava 2.x を指します。
RxJava 1.x ではないので注意してください。

解説に使うソースコードのバージョンは 2.0.7 です。
また、最新の RxJava 2.x ではコードが変更されている可能性があります。

サンプルコード

解説に利用するサンプルコードは次のとおりです。

サンプルコード
// Observable の作成
Observable<Integer> observable =
    Observable.create(e -> {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    });

// Observable のアイテムをコンソールに出力
observable.subscribe(
    System.out::println,
    System.out::println,
    System.out::println,
    System.out::println
);

Observable.create() を使い、三つのアイテムを発行する Observable を作成しています。
そして、発行されたアイテムそれぞれを出力するように subscribe しています。

真のサンプルコード

さきほどのサンプルコードは Java 8 のラムダ記法を使って、オブジェクトを生成するコードを省略しています。

実際にどのようなオブジェクトが生成されているかを確認するため、省略されている箇所をすべて明示的にします。

オブジェクトの生成を展開したサンプルコード
// Observer の生成
Observable<Integer> observable =
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override 
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
       }
    });

// Observer の生成と subscribe
observable.subscribe(new Observer<Integer>() {
    @Override 
    public void onSubscribe(Disposable d) {
        System.out.println(d);
    }
    @Override 
    public void onNext(Integer integer) {
        System.out.println(integer);
    }
    @Override 
    public void onError(Throwable e) {
        System.out.println(e);
    }
    @Override 
    public void onComplete() {
       // 何も出力しない
       System.out.println();
    }
});

Observable の生成では次のものが登場しています。

  • Observable<T> クラス
  • Observable.create() スタティックメソッド
  • ObservableOnSubscribe<T> インタフェース
  • ObservableEmitter<T> インタフェース

Observer の生成と subscribe では次のものが登場します。

  • Observer<T> インタフェース
  • Observable.subscribe() インスタンスメソッド

それではそれぞれの役割について見ていきます。

Observable クラス

RxJava の Observable クラスは次の二つの役割を担っています。

  • Observable インスタンスを生成するための static ファクトリーメソッドを提供
  • ObservableObservable たらしめるためのインスタンスメソッドを提供

Observableコードは 13,500 行ほどあります。
しかし、ほとんどが Javadoc であり処理として大したことはしていません。

static ファクトリーメソッド

Observable クラスのコードの上部では Observable インスタンスを生成する static ファクトリーメソッドがいくつも定義されています。サンプルコードで利用している Observable.create() もこれらのうちの一つです。

どのメソッドを使って Observable を使うべきかは A Decision Tree of Observable Operators の "I want to create a new Observable" が参考になります。

インスタンスメソッド

Observable クラスのコードの下部では、いくつものインスタンスメソッドが定義されています。サンプルコードで利用している subscribe() や、Observable をチェーンするための各種 Operator が用意されています。

様々なインスタンスメソッドがありますが、基本的には次のように実装されています。

それぞれのインスタンスメソッドごとに Observable を継承したクラスがある。それぞれのメソッドでは、対応する実装クラスのインスタンスを生成して返す

たとえば、Observable.map() では Observable を継承した ObservableMap<T> というクラスが用意されています。Observable.map() ではそのインスタンスを生成して、返しています。

Observable.map()
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    // ObservableMap のインスタンスを生成して、それをそのまま返している
    // (RxJavaPlugins.onAssembly() は通常何もしないので無視して OK)
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); 
}

ref: Observable.java#L8335

Observable.create()

ここでは Observable.create() が何をしているかを説明します。
Observable.create() のコードは次のようになっています。

Observable.create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

ref: Observable.java#L1420

引数チェックのメソッドなどを無視すると、次のような処理をしていることが分かります。

  1. ObservableOnSubscribe インスタンスを受け取り、
  2. ObservableCreate インスタンスを生成し、
  3. RxJavaPlugins.onAssembly() を呼び出し、返り値を返す

最後の RxJavaPlugins.onAssembly() は、通常は単に引数の値を返すだけです。そのため、ここでは無視してかまいません(RxJavaPlugins はテストなどで、動的に値をインジェクトしたい場合に利用します)。

すなわち Observable.create() は単に ObservableCreate インスタンスを生成して、返しているだけです。

ObservableCreate クラス

ObservableCreate は次の特徴があります。

  • Observable を継承している
  • Observable の abstract メソッドである subscribeActual() を実装している
  • source プロパティとして ObservableOnSubscribe への参照を持っている

UML にすると次のようになります(メソッドの戻り値、引数、言及しない要素については省略しています)。

今回の場合、ObservableOnSubscribe インスタンスはサンプルコードで作った無名クラスのものです。
すなわち、ObservableCreate はサンプルコード上で定義されたクラスのインスタンスを持っていることになります。

ObservableOnSubsribeの無名実装
// 無名クラスの定義
new ObservableOnSubscribe<Integer>() {
    @Override 
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
   }
}

ここまでのまとめ

ここまでで Observable の生成に関する説明は終わりです。

サンプルコードは次のとおりでした。

サンプルコード(Observerの生成)
Observable<Integer> observable =
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override 
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
       }
    });

このコードは次のことをしていました。

  • Observable.create() では、Observable を継承した ObservableCreate インスタンスを生成。
    • Observable.create() は static ファクトリーメソッド
  • ObservableCreate は、渡された ObservableOnSubscribe インスタンスへの参照を保持
    • ObservableOnSubscribe インスタンスの実体はサンプルコード上で定義した無名クラスインスタンス
  • RxJavaPlugins.onAssembly は通常何もしない(今回は無視)

Observable.subscribe()

次に、Observable.subscribe() が何をしているのかを説明します。
Observable.subscribe() のコードは次のようになっています。

Observable.subscribe()
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

ref: Observable.java#L10693

引数チェックやエラー処理などを無視すると、次のような処理をしていることが分かります。

  1. RxJavaPlugins.onSubscribe() を呼び、
  2. subscribeActual() に引数の Observer インスタンスを与えて呼び出す

例のごとく、最初の RxJavaPlugins.onSubscribe() は単に引数の値を返すだけです。ここでは無視してかまいません。

それでは subscribeActual() について詳しく見ていきます。

Observable.subscribeActual()

Observable.subscribeActual() は abstract メソッドです。

Observable.subscribeActual()
protected abstract void subscribeActual(Observer<? super T> observer);

ref: Observable.java#L10722

Observable を継承したクラスがそれぞれの性質に応じて実装しています。
今回のケースでは ObversableCreatesubscribeActual() を実装しています。

ObservableCreate.subscribeActual()

ObservableCreate.subscribeActual() のコードは次のとおりです。

ObservableCreate.subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
       // source は ObservableCreate のプロパティ
       // (final ObservableOnSubscribe<T> source;)
       source.subscribe(parent);
    } catch (Throwable ex) {
       Exceptions.throwIfFatal(ex);
       parent.onError(ex);
    }
}

ref: ObservableCreate.java#L35

ObservableCreate.subscribeActual() では次の処理をしていることが分かります。

  1. CreateEmitter<T> インスタンスを作成し、
  2. 引数の Observer インスタンスの onSubscribe() を呼び出し、
  3. ObservableOnSubscribe インスタンス(source プロパティで参照)の subscribe()CreateEmitter<T> インスタンスを与えて呼び出す

Observer のコールバックの一つである onSubscribe() はこのタイミングで呼び出されていることが分かります。

次に CreateEmitter<T> について詳しく見ていきます。

ObservableCreate.CreateEmitter クラス

CreateEmitter<T>ObservableCreate の static inner クラスであり、次のような特徴を持つクラスです。

  • ObservableEmitter<T> を実装している
    • ObservableEmitter<T>Emitter<T> を継承している
  • observer プロパティとして Observer への参照を保持している

UML にすると次のようになります(メソッドの戻り値、引数、言及しない要素については省略しています)。

作成された CreateEmitter<T> インスタンスは ObservableOnSubscribe.subscribe() に渡されています。

この subscribe() はサンプルコードで無名クラスとして定義されていました。

ObservableOnSubscribeの無名実装
new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
}

今回のケースでは、このメソッドの引数の ObservableEmitter の実体は CreateEmitter<T> となっています。

この subscribe() の実装では onNext() を3回呼んでいます。
CreateEmitteronNext() のソースコードを見てみます。

CreateEmitter.onNext()

CreateEmitter.onNext() のコードは次のとおりです。

CreateEmitter.onNext()
@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        // observer は CreateEmitter のプロパティ
        // (final Observer<? super T> observer;)
        observer.onNext(t);
    }
}

ref: ObservableCreate.java#L61

CreateEmitter.onNext() では次のことをしているのが分かります。

  1. 値の null チェックをし、
  2. Observer インスタンス(obsever プロパティで参照)の onNext() を呼び出す。

この Observer インスタンスはサンプルコードで定義した無名クラスです。

Observerの無名実装
new Observer<Integer>() {
    @Override 
    public void onSubscribe(Disposable d) {
        System.out.println(d);
    }
    @Override 
    public void onNext(Integer integer) {
        // ここが呼び出される
        System.out.println(integer);
    }
    @Override 
    public void onError(Throwable e) {
        System.out.println(e);
    }
    @Override 
    public void onComplete() {
       System.out.println();
    }
}

CreateEmitter<T>onNext()ObserveronNext() を呼んでいるだけでした。
すなわち、CreateEmitter<T>Observer をラップして、呼び出しを転送しているだけの存在ともいえます。

この後の onNext(2), onNext(3), onComplete() 呼び出しに関しても上記と同様です。
ラップしている Observer に呼び出しを転送しているだけです。

ここまでのまとめ

これで Observer の生成と subscribe() の説明は終わりです。

サンプルコードは次のとおりです。

// Observer の生成と subscribe
observable.subscribe(new Observer<Integer>() {
    @Override 
    public void onSubscribe(Disposable d) {
        System.out.println(d);
    }
    @Override 
    public void onNext(Integer integer) {
        System.out.println(integer);
    }
    @Override 
    public void onError(Throwable e) {
        System.out.println(e);
    }
    @Override 
    public void onComplete() {
       // 何も出力しない
       System.out.println();
    }
});

このコードは実際には次のことをしていました。

  • Observer.subscribe() はインスタンスメソッド
    • subscribeActual() を呼び出す
    • subscribeActual() は abstract メソッド
    • 今回のケースでは、実装しているのは ObservableCreateObservableCreate.subscribeActual() が呼び出される
  • ObservableCreate.subscribeActual()
    • CreateEmitter<T> を生成する
    • CreateEmitter<T> は渡された Observer をラップしているだけ
    • CreateEmitter<T>.onNext() が呼び出されると、Observer.onNext() がそのまま呼び出される

わかったことのまとめ

今回の実装調査を通して分かったことをまとめます。

Observable クラスには二つの役割があります。

  • Observable インスタンスを生成するための static ファクトリーメソッドを提供
  • ObservableObservable たらしめるためのインスタンスメソッドを提供

Observable のメソッドは、基本的に Observable の派生クラスを生成して返します。

  • 例1:Observable.create()ObservableCreate を生成
  • 例2:Observable.map()ObservableMap を生成

Observable.subscribe()Observable.subscribeActual() を呼び出しています。

  • subscribeActual() は abstract メソッド
  • Observable を継承したクラスが性質に応じて subscribeActual() を実装

ObservableCreate.subscribeActual() は次の処理を行っています。

  • Observer をラップするの CreateEmitter<T> を生成
  • ObservableOnSubscribe.subscribe() にその CreateEmitter<T> インスタンスを渡す
  • CreateEmitter<T>onNext()ObserveronNext() に転送