目的
ある定常的に変化する値を常に購読しておきたい場合において、ObservableではOnErrorが発生してしまった場合に購読が終了してしまう。
OnErrorが発生しようとも継続して購読しておきたいときはRxRelayを利用する。
RxRelay
Jake神が作ったライブラリ
https://github.com/JakeWharton/RxRelay
導入方法
build.gradleに以下の文を追加
implementation 'com.jakewharton.rxrelay2:rxrelay:2.1.1'
実装
実装方法はBehaviorSubject等とほとんど同じ。
val dataStream = BehaviorRelay.create<Data>()
BehaviorSubjectと異なる点は、onNextの代わりにacceptを利用すること
dataStream.accept(Data(1))
3種類提供されていて以下の違いがある。
BehaviorRelay :購読開始前に流れてきた最後のイベントを、購読開始時に受け取れる
PublishRelay :購読開始前に流れてきたイベントは受け取らない
ReplayRelay :購読開始前に流れてきたイベントは全て受け取る
onNextとacceptの違い
ほとんど同じ内容だが、onNextの場合はterminalEventがnull出ない場合に処理を終了している。
@Override
public void accept(T value) {
if (value == null) throw new NullPointerException("value == null");
setCurrent(value);
for (BehaviorDisposable<T> bs : subscribers.get()) {
bs.emitNext(value, index);
}
}
@Override
public void onNext(T t) {
ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
if (terminalEvent.get() != null) {
return;
}
Object o = NotificationLite.next(t);
setCurrent(o);
for (BehaviorDisposable<T> bs : subscribers.get()) {
bs.emitNext(o, index);
}
}
terminalEventは想像の通り、onError もしくは onCompleteで設定される。
@Override
public void onError(Throwable t) {
ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
if (!terminalEvent.compareAndSet(null, t)) {
RxJavaPlugins.onError(t);
return;
}
Object o = NotificationLite.error(t);
for (BehaviorDisposable<T> bs : terminate(o)) {
bs.emitNext(o, index);
}
}
@Override
public void onComplete() {
if (!terminalEvent.compareAndSet(null, ExceptionHelper.TERMINATED)) {
return;
}
Object o = NotificationLite.complete();
for (BehaviorDisposable<T> bs : terminate(o)) {
bs.emitNext(o, index); // relaxed read okay since this is the only mutator thread
}
}
なのでRelayではonErrorが発生しようが購読が継続されるよう。