RxJava
RxJava2

RxJava2 ソースコードリーディング(2) - RxJavaPlugins -

More than 1 year has passed since last update.

この記事は何?

お題を取り上げて RxJava2 のソースを読むという記事です。
ReactiveX の基本的な概念や RxJava の使い方をすでに知っている方を対象としています。

バージョン

本記事で RxJava と書く場合は RxJava 2.x を指します。
RxJava 1.x ではないので注意してください。

解説に使うソースコードのバージョンは 2.1.3 です。
また、最新の RxJava 2.x ではコードが変更されている可能性があります。

お題

前回のお題は Observable.create() と Observable.subscribe() について でした。
今回は、その中で登場した RxJavaPlugins について説明します。

実際に RxJavaPlugins が登場したコードは次の通りです。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

ref: Observable.java#L10693

Observable インスタンスを作成する static ファクトリーメソッドの中で RxJavaPlugins が使われていました。

ソースコード

全体

まず RxJavaPlugins のクラスの先頭を眺めてみます。

/**
 * Utility class to inject handlers to certain standard RxJava operations.
 */
public final class RxJavaPlugins {
   ...


    /** Helper class, no instances. */
    private RxJavaPlugins() {
        throw new IllegalStateException("No instances!");
    }
}

ref: RxJavaPlugins.java#L34

Javadoc のコメントから、RxJavaPlugins は各種ハンドラを inject する Utility クラスであることが分かります。
また、Utility クラスなので private コンストラクタが定義されています。

onAssembly メソッド

onAssembly は次のようなコードになっています。

/**
 * Calls the associated hook function.
 * @param <T> the value type
 * @param source the hook's input value
 * @return the value returned by the hook
 */
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Maybe<T> onAssembly(@NonNull Maybe<T> source) {
    Function<? super Maybe, ? extends Maybe> f = onMaybeAssembly; // (1)
    if (f != null) {
        return apply(f, source); // (2)
    }
    return source; // (3)
}

ref: RxJavaPlugins.java#L983

特に難しいことはしていません。

  1. static field の onMaybeAssembly プロパティをローカル変数に代入
  2. null 以外なら source に apply() して返す
  3. そうでなければ source をそのまま返す

という感じです。

static field の onMayAssembly は次のように定義されています。

@SuppressWarnings("rawtypes")
@Nullable
static volatile Function<? super Maybe, ? extends Maybe> onMaybeAssembly;

ref: RxJavaPlugins.java#L83

特に変わったところはありません。
メモリ可視性を保証するために volatile がつけられていることくらいです。

また、onMaybeAssembly には対応する getter と setter が定義されています。
これは割愛します。

apply メソッド

/**
 * Wraps the call to the function in try-catch and propagates thrown
 * checked exceptions as RuntimeException.
 * @param <T> the input type
 * @param <R> the output type
 * @param f the function to call, not null (not verified)
 * @param t the parameter value to the function
 * @return the result of the function call
 */
@NonNull
static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
    try {
        return f.apply(t);
    } catch (Throwable ex) {
        throw ExceptionHelper.wrapOrThrow(ex);
    }
}

ref: RxJavaPlugins.java#L1249

apply は、引数に関数を適用するだけのメソッドです。
ただし、発生した例外を一度キャッチして、ExceptionHandler で処理します。

ExceptionHelper クラス

ExceptionHelper クラスは RxJavaPlugins と同様に Utility クラスです。

wrapOrThrow() メソッドのコードは次の通りです。

/**
 * If the provided Throwable is an Error this method
 * throws it, otherwise returns a RuntimeException wrapping the error
 * if that error is a checked exception.
 * @param error the error to wrap or throw
 * @return the (wrapped) error
 */
public static RuntimeException wrapOrThrow(Throwable error) {
    if (error instanceof Error) {
        throw (Error)error;
    }
    if (error instanceof RuntimeException) {
        return (RuntimeException)error;
    }
    return new RuntimeException(error);
}

ref: ExceptionHelper.java#L38

  • 例外が Error ならスロー
  • 例外が RuntimeException ならそのまま return
  • 例外がチェックされる例外なら RuntimeException にラップして return

すなわち、Error 以外は必ず RuntimeException にラップされるということです。

まとめ

これらを踏まえると、最初に見た次のコードは、テストなどで handler が inject されていたらそれを利用するというコードだったことが分かります。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    // inject されていたらその handler を利用して ObservableCreate を変換
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

補足

RxJava 1.x では RxJavaHook というクラスと RxJavaPlugins が併用されていたようです。1
しかし、RxJava 2.x になって RxJavaPlugins だけに統合されました。2