Edited at
RxJavaDay 19

Notificationとmaterialize()について調べてみた

More than 3 years have passed since last update.

RxJava Advent Calendar 2015 19日目の記事です。

RxJava/src/main/java/rx/の中を覗いていると、Notificationというクラスがあることに気付きました。

このクラスだけ全く見た事がなかったので、少し調べて見ました。


Notificationの生成方法

NotificationはObservable#materialize()メソッドで生成できるようです。

具体的には、materialize()Observable<T>を引数に取り、Observable<Notification<T>>を返します。

ちなみに、materialize()の対を成すdematerialize()というメソッドもあり、これはObservable<Notification<T>>からObservable<T>を生成します。


何ができるの?

materialize()の効果は、一言で言うと「Observableの出力をNotificationで包み、OnNext, OnError, OnCompletedを1箇所で扱うようにできるもの」のようです。


使ってみる

早速使ってみましょう。例として出力される値をロギングします。

比較として、まずはmaterialize()やNotificationを使わない場合を見ます。

(型を明示するためにラムダ式を使わず書いています。)

String[] items = {"foo", "bar", "baz"};

Observable.from(items).subscribe(new Action1<String>() {
@Override
public void call(String s) {
log("onNext: " + s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
log("onError");
}
}, new Action0() {
@Override
public void call() {
log("onCompleted");
}
});

onNext(), onError(), onCompleted()の3箇所全てでロギングのための記述をしています。

出力は以下のようになります。

onNext: foo

onNext: bar
onNext: baz
onCompleted

ではmaterialize()とNotificationを使ってみましょう。

String[] items = {"foo", "bar", "baz"};

Observable.from(items).materialize().subscribe(new Action1<Notification<String>>() {
@Override
public void call(Notification<String> stringNotification) {
log(stringNotification.toString());
}
});

materialize()Observable<Notification<String>>を返します。

これをsubscribeしてロギングすると、以下のように出力されます。

[rx.Notification@133dcd74 OnNext foo]

[rx.Notification@133dbcc1 OnNext bar]
[rx.Notification@133dbcc9 OnNext baz]
[rx.Notification@df4db0e OnCompleted]

1箇所でロギングしただけでOnCompleted()の実行まで確認することができました。


OnNext, OnError, OnCompletedを区別する

また、Notificationはkindというenumのメンバ変数を持っています。

public static enum Kind {

OnNext, OnError, OnCompleted
}

これを使うことで、OnNext, OnError, OnCompletedを区別し、別々の処理をすることもできます。

String[] items = {"foo", "bar", "baz"};

Observable.from(items).materialize().subscribe(new Action1<Notification<String>>() {
@Override
public void call(Notification<String> stringNotification) {
// 共通の処理

switch (stringNotification.getKind()) {
case OnNext:
// OnNext用の処理
break;
case OnError:
// OnError用の処理
break;
case OnCompleted:
// OnCompleted用の処理
break;
}
}
});

個別に区別するためにisOnNext(), isOnError(), isOnCompleted()も用意されています。


値やThrowableを取得する

getValue()でOnNextの値を、getThrowable()でOnErrorのthrowableを取得することもできます。

String[] items = {"foo", "bar", "baz"};

Observable.from(items).materialize().subscribe(new Action1<Notification<String>>() {
@Override
public void call(Notification<String> stringNotification) {
log(stringNotification.getKind() + " is " + stringNotification.getValue());
}
});

OnNext is foo

OnNext is bar
OnNext is baz
OnCompleted is null

OnCompletedは値を持たないのでnullが出力されました。


accept()メソッド

accept(Observer<? super T> observer)というメソッドもあります。

引数にObserverを取るようなので、以下のように書いてみます。

String[] items = {"foo", "bar", "baz"};

Observable.from(items).materialize().subscribe(new Action1<Notification<String>>() {
@Override
public void call(Notification<String> stringNotification) {
stringNotification.accept(new Observer<String>() {
@Override
public void onCompleted() {
log("accept: onCompleted");
}

@Override
public void onError(Throwable e) {
log("accept: onError");
}

@Override
public void onNext(String s) {
log("accept: onNext, " + s);
}
});
}
});

accept: OnNext, foo

accept: OnNext, bar
accept: OnNext, baz
accept: OnCompleted

値を取り出すことができましたが、取り出すだけならgetValue()でも良いはずです。

このメソッドのJavaDocにはこうあります。


Forwards this notification on to a specified Observer.

このNotificationを指定されたObserverに転送する。


単に値を取り出すのではなく、Observerの上で扱うことに意味がある状況があるようです。

少し調べたのですが、このaccept()の使いどころは結局良く分かりませんでした・・・


まとめ

Observable#materialize()Observable<T>からObservable<Notification<T>>を生成します。

NotificationはOnNext, OnError, OnCompletedの区別なく、全てを同様に扱うことができます。

使いどころが難しそうですが、この記事で例にしたように、Observableの値がどうなっているかロギングする時や、何かしら共通の処理が多い時には役に立つかもしれません。


参考