47
39

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

TR;DL

  • rx.Singleを使えば値が1回しか流れてこないよ
  • rxSingleは何回も値を流そうとしても何もおきないよ
  • single() operatorは2つ目以降の値が流れてきたらException吐くよ

rx.Single<T>

rx.Single<T>がRxJava 1.1.0でexperimentalからbetaに昇格した.rx.Observableの代わりに使う.APIもだいたい似たような感じだが,大きく違うのがsubscribe()

public class Single<T> {
    // ...

    // https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L1407-L1430
    public final Subscription subscribe(final Action1<? super T> onSuccess) {
      // ...
    }

    // https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L1451-L1477
    public final Subscription subscribe(final Action1<? super T> onSuccess, final Action1<Throwable> onError) {
        // ...
    }

    // ...
}

rx.Singleには第3引数にonCompleteをとるsubscribe()メソッドが生えていない.

rx.SingleSubscriber<T>

従来のrx.Subscriber<T>にはSubscriptionのメソッドに加え,rx.Observer<T>の3メソッドが用意されている.

// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Subscriber.java
public abstract class Subscriber<T> implements Observer<T>, Subscription {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);

    // ...
}

一方,rx.SingleSubscriber<T>にはSubscriptionのものを除けばメソッドが2つしか存在しない.

// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/SingleSubscriber.java
public abstract class SingleSubscriber<T> implements Subscription {
    public abstract void onSuccess(T value);
    public abstract void onError(Throwable error);

    // ...
}

onSuccess()は何回呼べる?

1回(2回目以降も実行はできるけど何も起きない).

@Test
public void testSingle() throws Exception {
    final CountDownLatch latch = new CountDownLatch(3);

    Single.create(
            subscriber -> {
                subscriber.onSuccess(0);
                subscriber.onSuccess(1);
                subscriber.onSuccess(2);
            })
            .subscribe(_v -> latch.countDown());

    latch.await(100, TimeUnit.MILLISECONDS);

    assertThat(latch.getCount(), is(2L));
}

何に使う?

  • 一度しか値が流れないようなもの
  • Promise的な運用

single operatorとの使い分けは?

operatorの方のsingle()は2つ目以降の値が流れてきたらException吐いてくる.2つ目以降の値は流れてきてはいけないものならばoperatorの方のsingle()を使えばいいのかな?

public final class OperatorSingle<T> implements Operator<T, T> {
    // ...

    private static final class ParentSubscriber<T> extends Subscriber<T> {
        // ...

        // https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/internal/operators/OperatorSingle.java#L106-L116
        @Override
        public void onNext(T value) {
            if (isNonEmpty) {
                hasTooManyElements = true;
                child.onError(new IllegalArgumentException("Sequence contains too many elements"));
                unsubscribe();
            } else {
                this.value = value;
                isNonEmpty = true;
            }
        }

        // ...
    }
}

その他

toObservable()したらいつものrx.Observableが返ってくる.

public class Single<T> {
    // ...

    // https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L248-L251
    private static <T> Observable<T> asObservable(Single<T> t) {
        return Observable.create(t.onSubscribe);
    }

    // https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L1694-L1696
    public final Observable<T> toObservable() {
    	return asObservable(this);
    }
}

References

47
39
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
47
39

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?