Notificationとmaterialize()について調べてみた

  • 17
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

RxJava Advent Calendar 2015 19日目の記事です。

RxJava/src/main/java/rx/の中を覗いていると、Notificationというクラスがあることに気付きました。
このクラスだけ全く見た事がなかったので、少し調べて見ました。

Notificationの生成方法

NotificationはObservable#materialize()メソッドで生成できるようです。
具体的には、materialize()Observable<T>を引数に取り、Observable<Notification<T>>を返します。

ちなみに、materialize()の対を成すdematerialize()というメソッドもあり、これはObservable<Notification<T>>からObservable<T>を生成します。

何ができるの?

materialize()の効果は、一言で言うと「Observableの出力をNotificationで包み、OnNext, OnError, OnCompletedを1箇所で扱うようにできるもの」のようです。

使ってみる

早速使ってみましょう。例として出力される値をロギングします。
比較として、まずはmaterialize()やNotificationを使わない場合を見ます。
(型を明示するためにラムダ式を使わず書いています。)

String[] items = {"foo", "bar", "baz"};

Observable.from(items).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        log("onNext: " + s);
    }
}, new Action1<Throwable>() {
    @Override
    public void call(Throwable throwable) {
        log("onError");
    }
}, new Action0() {
    @Override
    public void call() {
        log("onCompleted");
    }
});

onNext(), onError(), onCompleted()の3箇所全てでロギングのための記述をしています。

出力は以下のようになります。

onNext: foo
onNext: bar
onNext: baz
onCompleted

ではmaterialize()とNotificationを使ってみましょう。

String[] items = {"foo", "bar", "baz"};

Observable.from(items).materialize().subscribe(new Action1<Notification<String>>() {
    @Override
    public void call(Notification<String> stringNotification) {
        log(stringNotification.toString());
    }
});

materialize()Observable<Notification<String>>を返します。
これをsubscribeしてロギングすると、以下のように出力されます。

[rx.Notification@133dcd74 OnNext foo]
[rx.Notification@133dbcc1 OnNext bar]
[rx.Notification@133dbcc9 OnNext baz]
[rx.Notification@df4db0e OnCompleted]

1箇所でロギングしただけでOnCompleted()の実行まで確認することができました。

OnNext, OnError, OnCompletedを区別する

また、Notificationはkindというenumのメンバ変数を持っています。

public static enum Kind {
    OnNext, OnError, OnCompleted
}

これを使うことで、OnNext, OnError, OnCompletedを区別し、別々の処理をすることもできます。

String[] items = {"foo", "bar", "baz"};

Observable.from(items).materialize().subscribe(new Action1<Notification<String>>() {
    @Override
    public void call(Notification<String> stringNotification) {
        // 共通の処理

        switch (stringNotification.getKind()) {
            case OnNext:
                // OnNext用の処理
                break;
            case OnError:
                // OnError用の処理
                break;
            case OnCompleted:
                // OnCompleted用の処理
                break;
        }
    }
});

個別に区別するためにisOnNext(), isOnError(), isOnCompleted()も用意されています。

値やThrowableを取得する

getValue()でOnNextの値を、getThrowable()でOnErrorのthrowableを取得することもできます。

String[] items = {"foo", "bar", "baz"};

Observable.from(items).materialize().subscribe(new Action1<Notification<String>>() {
    @Override
    public void call(Notification<String> stringNotification) {
        log(stringNotification.getKind() + " is " + stringNotification.getValue());
    }
});
OnNext is foo
OnNext is bar
OnNext is baz
OnCompleted is null

OnCompletedは値を持たないのでnullが出力されました。

accept()メソッド

accept(Observer<? super T> observer)というメソッドもあります。
引数にObserverを取るようなので、以下のように書いてみます。

String[] items = {"foo", "bar", "baz"};

Observable.from(items).materialize().subscribe(new Action1<Notification<String>>() {
    @Override
    public void call(Notification<String> stringNotification) {
        stringNotification.accept(new Observer<String>() {
            @Override
            public void onCompleted() {
                log("accept: onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                log("accept: onError");
            }

            @Override
            public void onNext(String s) {
                log("accept: onNext, " + s);
            }
        });
    }
});
accept: OnNext, foo
accept: OnNext, bar
accept: OnNext, baz
accept: OnCompleted

値を取り出すことができましたが、取り出すだけならgetValue()でも良いはずです。
このメソッドのJavaDocにはこうあります。

Forwards this notification on to a specified Observer.

このNotificationを指定されたObserverに転送する。

単に値を取り出すのではなく、Observerの上で扱うことに意味がある状況があるようです。
少し調べたのですが、このaccept()の使いどころは結局良く分かりませんでした・・・

まとめ

Observable#materialize()Observable<T>からObservable<Notification<T>>を生成します。
NotificationはOnNext, OnError, OnCompletedの区別なく、全てを同様に扱うことができます。

使いどころが難しそうですが、この記事で例にしたように、Observableの値がどうなっているかロギングする時や、何かしら共通の処理が多い時には役に立つかもしれません。

参考

この投稿は RxJava Advent Calendar 201519日目の記事です。