この記事は何?
お題を取り上げて RxJava2 のソースを読むという記事です。
ReactiveX の基本的な概念や RxJava の使い方をすでに知っている方を対象としています。
バージョン
本記事で RxJava と書く場合は RxJava 2.x を指します。
RxJava 1.x ではないので注意してください。
解説に使うソースコードのバージョンは 2.1.3 です。
また、最新の RxJava 2.x ではコードが変更されている可能性があります。
お題
前回のお題は Observable.create() と Observable.subscribe() について でした。
今回は、その中で登場した RxJavaPlugins について説明します。
実際に RxJavaPlugins が登場したコードは次の通りです。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Observable インスタンスを作成する static ファクトリーメソッドの中で RxJavaPlugins が使われていました。
ソースコード
全体
まず RxJavaPlugins のクラスの先頭を眺めてみます。
/**
* Utility class to inject handlers to certain standard RxJava operations.
*/
public final class RxJavaPlugins {
...
/** Helper class, no instances. */
private RxJavaPlugins() {
throw new IllegalStateException("No instances!");
}
}
Javadoc のコメントから、RxJavaPlugins は各種ハンドラを inject する Utility クラスであることが分かります。
また、Utility クラスなので private コンストラクタが定義されています。
onAssembly メソッド
onAssembly は次のようなコードになっています。
/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook's input value
* @return the value returned by the hook
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Maybe<T> onAssembly(@NonNull Maybe<T> source) {
Function<? super Maybe, ? extends Maybe> f = onMaybeAssembly; // (1)
if (f != null) {
return apply(f, source); // (2)
}
return source; // (3)
}
特に難しいことはしていません。
- static field の onMaybeAssembly プロパティをローカル変数に代入
- null 以外なら source に apply() して返す
- そうでなければ source をそのまま返す
という感じです。
static field の onMayAssembly は次のように定義されています。
@SuppressWarnings("rawtypes")
@Nullable
static volatile Function<? super Maybe, ? extends Maybe> onMaybeAssembly;
特に変わったところはありません。
メモリ可視性を保証するために volatile がつけられていることくらいです。
また、onMaybeAssembly には対応する getter と setter が定義されています。
これは割愛します。
apply メソッド
/**
* Wraps the call to the function in try-catch and propagates thrown
* checked exceptions as RuntimeException.
* @param <T> the input type
* @param <R> the output type
* @param f the function to call, not null (not verified)
* @param t the parameter value to the function
* @return the result of the function call
*/
@NonNull
static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
try {
return f.apply(t);
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
apply は、引数に関数を適用するだけのメソッドです。
ただし、発生した例外を一度キャッチして、ExceptionHandler で処理します。
ExceptionHelper クラス
ExceptionHelper クラスは RxJavaPlugins と同様に Utility クラスです。
wrapOrThrow() メソッドのコードは次の通りです。
/**
* If the provided Throwable is an Error this method
* throws it, otherwise returns a RuntimeException wrapping the error
* if that error is a checked exception.
* @param error the error to wrap or throw
* @return the (wrapped) error
*/
public static RuntimeException wrapOrThrow(Throwable error) {
if (error instanceof Error) {
throw (Error)error;
}
if (error instanceof RuntimeException) {
return (RuntimeException)error;
}
return new RuntimeException(error);
}
- 例外が Error ならスロー
- 例外が RuntimeException ならそのまま return
- 例外がチェックされる例外なら RuntimeException にラップして return
すなわち、Error 以外は必ず RuntimeException にラップされるということです。
まとめ
これらを踏まえると、最初に見た次のコードは、テストなどで handler が inject されていたらそれを利用するというコードだったことが分かります。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
// inject されていたらその handler を利用して ObservableCreate を変換
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
補足
RxJava 1.x では RxJavaHook というクラスと RxJavaPlugins が併用されていたようです。1
しかし、RxJava 2.x になって RxJavaPlugins だけに統合されました。2