rx.Singleについて

  • 40
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

TR;DL

  • rx.Singleを使えば値が1回しか流れてこないよ
  • rxSingleは何回も値を流そうとしても何もおきないよ
  • single() operatorは2つ目以降の値が流れてきたらException吐くよ

rx.Single<T>

rx.Single<T>がRxJava 1.1.0でexperimentalからbetaに昇格した.rx.Observableの代わりに使う.APIもだいたい似たような感じだが,大きく違うのがsubscribe()

public class Single<T> {
    // ...

    // https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L1407-L1430
    public final Subscription subscribe(final Action1<? super T> onSuccess) {
      // ...
    }

    // https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L1451-L1477
    public final Subscription subscribe(final Action1<? super T> onSuccess, final Action1<Throwable> onError) {
        // ...
    }

    // ...
}

rx.Singleには第3引数にonCompleteをとるsubscribe()メソッドが生えていない.

rx.SingleSubscriber<T>

従来のrx.Subscriber<T>にはSubscriptionのメソッドに加え,rx.Observer<T>の3メソッドが用意されている.

// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Subscriber.java
public abstract class Subscriber<T> implements Observer<T>, Subscription {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);

    // ...
}

一方,rx.SingleSubscriber<T>にはSubscriptionのものを除けばメソッドが2つしか存在しない.

// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/SingleSubscriber.java
public abstract class SingleSubscriber<T> implements Subscription {
    public abstract void onSuccess(T value);
    public abstract void onError(Throwable error);

    // ...
}

onSuccess()は何回呼べる?

1回(2回目以降も実行はできるけど何も起きない).

@Test
public void testSingle() throws Exception {
    final CountDownLatch latch = new CountDownLatch(3);

    Single.create(
            subscriber -> {
                subscriber.onSuccess(0);
                subscriber.onSuccess(1);
                subscriber.onSuccess(2);
            })
            .subscribe(_v -> latch.countDown());

    latch.await(100, TimeUnit.MILLISECONDS);

    assertThat(latch.getCount(), is(2L));
}

何に使う?

  • 一度しか値が流れないようなもの
  • Promise的な運用

single operatorとの使い分けは?

operatorの方のsingle()は2つ目以降の値が流れてきたらException吐いてくる.2つ目以降の値は流れてきてはいけないものならばoperatorの方のsingle()を使えばいいのかな?

public final class OperatorSingle<T> implements Operator<T, T> {
    // ...

    private static final class ParentSubscriber<T> extends Subscriber<T> {
        // ...

        // https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/internal/operators/OperatorSingle.java#L106-L116
        @Override
        public void onNext(T value) {
            if (isNonEmpty) {
                hasTooManyElements = true;
                child.onError(new IllegalArgumentException("Sequence contains too many elements"));
                unsubscribe();
            } else {
                this.value = value;
                isNonEmpty = true;
            }
        }

        // ...
    }
}

その他

toObservable()したらいつものrx.Observableが返ってくる.

public class Single<T> {
    // ...

    // https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L248-L251
    private static <T> Observable<T> asObservable(Single<T> t) {
        return Observable.create(t.onSubscribe);
    }

    // https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L1694-L1696
    public final Observable<T> toObservable() {
        return asObservable(this);
    }
}

References

この投稿は RxJava Advent Calendar 20158日目の記事です。