この記事は何?
単純なサンプルコードを使って RxJava2 の実装について説明しています。
ReactiveX の基本的な概念や RxJava の使い方をすでに知っている方を対象としています。
バージョン
本記事で RxJava と書く場合は RxJava 2.x を指します。
RxJava 1.x ではないので注意してください。
解説に使うソースコードのバージョンは 2.0.7 です。
また、最新の RxJava 2.x ではコードが変更されている可能性があります。
サンプルコード
解説に利用するサンプルコードは次のとおりです。
// Observable の作成
Observable<Integer> observable =
Observable.create(e -> {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
});
// Observable のアイテムをコンソールに出力
observable.subscribe(
System.out::println,
System.out::println,
System.out::println,
System.out::println
);
Observable.create() を使い、三つのアイテムを発行する Observable を作成しています。
そして、発行されたアイテムそれぞれを出力するように subscribe しています。
真のサンプルコード
さきほどのサンプルコードは Java 8 のラムダ記法を使って、オブジェクトを生成するコードを省略しています。
実際にどのようなオブジェクトが生成されているかを確認するため、省略されている箇所をすべて明示的にします。
// Observer の生成
Observable<Integer> observable =
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
// Observer の生成と subscribe
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(d);
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onComplete() {
// 何も出力しない
System.out.println();
}
});
Observable の生成では次のものが登場しています。
-
Observable<T>
クラス -
Observable.create()
スタティックメソッド -
ObservableOnSubscribe<T>
インタフェース -
ObservableEmitter<T>
インタフェース
Observer の生成と subscribe では次のものが登場します。
-
Observer<T>
インタフェース -
Observable.subscribe()
インスタンスメソッド
それではそれぞれの役割について見ていきます。
Observable クラス
RxJava の Observable
クラスは次の二つの役割を担っています。
-
Observable
インスタンスを生成するための static ファクトリーメソッドを提供 -
Observable
をObservable
たらしめるためのインスタンスメソッドを提供
Observable
のコードは 13,500 行ほどあります。
しかし、ほとんどが Javadoc であり処理として大したことはしていません。
static ファクトリーメソッド
Observable
クラスのコードの上部では Observable
インスタンスを生成する static ファクトリーメソッドがいくつも定義されています。サンプルコードで利用している Observable.create()
もこれらのうちの一つです。
どのメソッドを使って Observable
を使うべきかは A Decision Tree of Observable Operators の "I want to create a new Observable" が参考になります。
インスタンスメソッド
Observable
クラスのコードの下部では、いくつものインスタンスメソッドが定義されています。サンプルコードで利用している subscribe()
や、Observable
をチェーンするための各種 Operator が用意されています。
様々なインスタンスメソッドがありますが、基本的には次のように実装されています。
それぞれのインスタンスメソッドごとに Observable
を継承したクラスがある。それぞれのメソッドでは、対応する実装クラスのインスタンスを生成して返す
たとえば、Observable.map()
では Observable
を継承した ObservableMap<T>
というクラスが用意されています。Observable.map()
ではそのインスタンスを生成して、返しています。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
// ObservableMap のインスタンスを生成して、それをそのまま返している
// (RxJavaPlugins.onAssembly() は通常何もしないので無視して OK)
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
Observable.create()
ここでは Observable.create()
が何をしているかを説明します。
Observable.create()
のコードは次のようになっています。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
引数チェックのメソッドなどを無視すると、次のような処理をしていることが分かります。
-
ObservableOnSubscribe
インスタンスを受け取り、 -
ObservableCreate
インスタンスを生成し、 -
RxJavaPlugins.onAssembly()
を呼び出し、返り値を返す
最後の RxJavaPlugins.onAssembly()
は、通常は単に引数の値を返すだけです。そのため、ここでは無視してかまいません(RxJavaPlugins
はテストなどで、動的に値をインジェクトしたい場合に利用します)。
すなわち Observable.create()
は単に ObservableCreate
インスタンスを生成して、返しているだけです。
ObservableCreate クラス
ObservableCreate
は次の特徴があります。
-
Observable
を継承している -
Observable
の abstract メソッドであるsubscribeActual()
を実装している -
source
プロパティとしてObservableOnSubscribe
への参照を持っている
UML にすると次のようになります(メソッドの戻り値、引数、言及しない要素については省略しています)。
今回の場合、ObservableOnSubscribe
インスタンスはサンプルコードで作った無名クラスのものです。
すなわち、ObservableCreate
はサンプルコード上で定義されたクラスのインスタンスを持っていることになります。
// 無名クラスの定義
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}
ここまでのまとめ
ここまでで Observable の生成に関する説明は終わりです。
サンプルコードは次のとおりでした。
Observable<Integer> observable =
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
このコードは次のことをしていました。
-
Observable.create()
では、Observable
を継承したObservableCreate
インスタンスを生成。-
Observable.create()
は static ファクトリーメソッド
-
-
ObservableCreate
は、渡されたObservableOnSubscribe
インスタンスへの参照を保持-
ObservableOnSubscribe
インスタンスの実体はサンプルコード上で定義した無名クラスインスタンス
-
-
RxJavaPlugins.onAssembly
は通常何もしない(今回は無視)
Observable.subscribe()
次に、Observable.subscribe()
が何をしているのかを説明します。
Observable.subscribe()
のコードは次のようになっています。
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
引数チェックやエラー処理などを無視すると、次のような処理をしていることが分かります。
-
RxJavaPlugins.onSubscribe()
を呼び、 -
subscribeActual()
に引数のObserver
インスタンスを与えて呼び出す
例のごとく、最初の RxJavaPlugins.onSubscribe()
は単に引数の値を返すだけです。ここでは無視してかまいません。
それでは subscribeActual()
について詳しく見ていきます。
Observable.subscribeActual()
Observable.subscribeActual()
は abstract メソッドです。
protected abstract void subscribeActual(Observer<? super T> observer);
Observable
を継承したクラスがそれぞれの性質に応じて実装しています。
今回のケースでは ObversableCreate
が subscribeActual()
を実装しています。
ObservableCreate.subscribeActual()
ObservableCreate.subscribeActual()
のコードは次のとおりです。
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
// source は ObservableCreate のプロパティ
// (final ObservableOnSubscribe<T> source;)
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
ref: ObservableCreate.java#L35
ObservableCreate.subscribeActual()
では次の処理をしていることが分かります。
-
CreateEmitter<T>
インスタンスを作成し、 - 引数の
Observer
インスタンスのonSubscribe()
を呼び出し、 -
ObservableOnSubscribe
インスタンス(source
プロパティで参照)のsubscribe()
にCreateEmitter<T>
インスタンスを与えて呼び出す
Observer
のコールバックの一つである onSubscribe()
はこのタイミングで呼び出されていることが分かります。
次に CreateEmitter<T>
について詳しく見ていきます。
ObservableCreate.CreateEmitter クラス
CreateEmitter<T>
は ObservableCreate
の static inner クラスであり、次のような特徴を持つクラスです。
-
ObservableEmitter<T>
を実装している-
ObservableEmitter<T>
はEmitter<T>
を継承している
-
-
observer
プロパティとしてObserver
への参照を保持している
UML にすると次のようになります(メソッドの戻り値、引数、言及しない要素については省略しています)。
作成された CreateEmitter<T>
インスタンスは ObservableOnSubscribe.subscribe()
に渡されています。
この subscribe()
はサンプルコードで無名クラスとして定義されていました。
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}
今回のケースでは、このメソッドの引数の ObservableEmitter
の実体は CreateEmitter<T>
となっています。
この subscribe()
の実装では onNext()
を3回呼んでいます。
CreateEmitter
の onNext()
のソースコードを見てみます。
CreateEmitter.onNext()
CreateEmitter.onNext()
のコードは次のとおりです。
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
// observer は CreateEmitter のプロパティ
// (final Observer<? super T> observer;)
observer.onNext(t);
}
}
ref: ObservableCreate.java#L61
CreateEmitter.onNext()
では次のことをしているのが分かります。
- 値の null チェックをし、
-
Observer
インスタンス(obsever
プロパティで参照)のonNext()
を呼び出す。
この Observer
インスタンスはサンプルコードで定義した無名クラスです。
new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(d);
}
@Override
public void onNext(Integer integer) {
// ここが呼び出される
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onComplete() {
System.out.println();
}
}
CreateEmitter<T>
の onNext()
は Observer
の onNext()
を呼んでいるだけでした。
すなわち、CreateEmitter<T>
は Observer
をラップして、呼び出しを転送しているだけの存在ともいえます。
この後の onNext(2)
, onNext(3)
, onComplete()
呼び出しに関しても上記と同様です。
ラップしている Observer
に呼び出しを転送しているだけです。
ここまでのまとめ
これで Observer の生成と subscribe() の説明は終わりです。
サンプルコードは次のとおりです。
// Observer の生成と subscribe
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(d);
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onComplete() {
// 何も出力しない
System.out.println();
}
});
このコードは実際には次のことをしていました。
-
Observer.subscribe()
はインスタンスメソッド-
subscribeActual()
を呼び出す -
subscribeActual()
は abstract メソッド - 今回のケースでは、実装しているのは
ObservableCreate
でObservableCreate.subscribeActual()
が呼び出される
-
-
ObservableCreate.subscribeActual()
-
CreateEmitter<T>
を生成する -
CreateEmitter<T>
は渡されたObserver
をラップしているだけ -
CreateEmitter<T>.onNext()
が呼び出されると、Observer.onNext()
がそのまま呼び出される
-
わかったことのまとめ
今回の実装調査を通して分かったことをまとめます。
Observable
クラスには二つの役割があります。
-
Observable
インスタンスを生成するための static ファクトリーメソッドを提供 -
Observable
をObservable
たらしめるためのインスタンスメソッドを提供
Observable
のメソッドは、基本的に Observable
の派生クラスを生成して返します。
- 例1:
Observable.create()
はObservableCreate
を生成 - 例2:
Observable.map()
はObservableMap
を生成
Observable.subscribe()
は Observable.subscribeActual()
を呼び出しています。
-
subscribeActual()
は abstract メソッド -
Observable
を継承したクラスが性質に応じてsubscribeActual()
を実装
ObservableCreate.subscribeActual()
は次の処理を行っています。
-
Observer
をラップするのCreateEmitter<T>
を生成 -
ObservableOnSubscribe.subscribe()
にそのCreateEmitter<T>
インスタンスを渡す -
CreateEmitter<T>
のonNext()
はObserver
のonNext()
に転送