LoginSignup
3
4

More than 3 years have passed since last update.

RxRelayを利用してデータのブリッジを行う

Posted at

目的

ある定常的に変化する値を常に購読しておきたい場合において、ObservableではOnErrorが発生してしまった場合に購読が終了してしまう。
OnErrorが発生しようとも継続して購読しておきたいときはRxRelayを利用する。

RxRelay

Jake神が作ったライブラリ
https://github.com/JakeWharton/RxRelay

導入方法

build.gradleに以下の文を追加

implementation 'com.jakewharton.rxrelay2:rxrelay:2.1.1'

実装

実装方法はBehaviorSubject等とほとんど同じ。

val dataStream = BehaviorRelay.create<Data>()

BehaviorSubjectと異なる点は、onNextの代わりにacceptを利用すること

dataStream.accept(Data(1))

3種類提供されていて以下の違いがある。
BehaviorRelay :購読開始前に流れてきた最後のイベントを、購読開始時に受け取れる
PublishRelay :購読開始前に流れてきたイベントは受け取らない
ReplayRelay :購読開始前に流れてきたイベントは全て受け取る

onNextとacceptの違い

ほとんど同じ内容だが、onNextの場合はterminalEventがnull出ない場合に処理を終了している。

    @Override
    public void accept(T value) {
        if (value == null) throw new NullPointerException("value == null");

        setCurrent(value);
        for (BehaviorDisposable<T> bs : subscribers.get()) {
            bs.emitNext(value, index);
        }
    }
    @Override
    public void onNext(T t) {
        ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");

        if (terminalEvent.get() != null) {
            return;
        }
        Object o = NotificationLite.next(t);
        setCurrent(o);
        for (BehaviorDisposable<T> bs : subscribers.get()) {
            bs.emitNext(o, index);
        }
    }

terminalEventは想像の通り、onError もしくは onCompleteで設定される。


    @Override
    public void onError(Throwable t) {
        ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        if (!terminalEvent.compareAndSet(null, t)) {
            RxJavaPlugins.onError(t);
            return;
        }
        Object o = NotificationLite.error(t);
        for (BehaviorDisposable<T> bs : terminate(o)) {
            bs.emitNext(o, index);
        }
    }

    @Override
    public void onComplete() {
        if (!terminalEvent.compareAndSet(null, ExceptionHelper.TERMINATED)) {
            return;
        }
        Object o = NotificationLite.complete();
        for (BehaviorDisposable<T> bs : terminate(o)) {
            bs.emitNext(o, index);  // relaxed read okay since this is the only mutator thread
        }
    }

なのでRelayではonErrorが発生しようが購読が継続されるよう。

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4