Android
RxJava
RxJavaDay 8

rx.Singleについて

More than 3 years have passed since last update.


TR;DL



  • rx.Singleを使えば値が1回しか流れてこないよ


  • rxSingleは何回も値を流そうとしても何もおきないよ


  • single() operatorは2つ目以降の値が流れてきたらException吐くよ


rx.Single<T>

rx.Single<T>がRxJava 1.1.0でexperimentalからbetaに昇格した.rx.Observableの代わりに使う.APIもだいたい似たような感じだが,大きく違うのがsubscribe()

public class Single<T> {

// ...

// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L1407-L1430
public final Subscription subscribe(final Action1<? super T> onSuccess) {
// ...
}

// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L1451-L1477
public final Subscription subscribe(final Action1<? super T> onSuccess, final Action1<Throwable> onError) {
// ...
}

// ...
}

rx.Singleには第3引数にonCompleteをとるsubscribe()メソッドが生えていない.


rx.SingleSubscriber<T>

従来のrx.Subscriber<T>にはSubscriptionのメソッドに加え,rx.Observer<T>の3メソッドが用意されている.

// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Subscriber.java

public abstract class Subscriber<T> implements Observer<T>, Subscription {
void onCompleted();
void onError(Throwable e);
void onNext(T t);

// ...
}

一方,rx.SingleSubscriber<T>にはSubscriptionのものを除けばメソッドが2つしか存在しない.

// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/SingleSubscriber.java

public abstract class SingleSubscriber<T> implements Subscription {
public abstract void onSuccess(T value);
public abstract void onError(Throwable error);

// ...
}


onSuccess()は何回呼べる?

1回(2回目以降も実行はできるけど何も起きない).

@Test

public void testSingle() throws Exception {
final CountDownLatch latch = new CountDownLatch(3);

Single.create(
subscriber -> {
subscriber.onSuccess(0);
subscriber.onSuccess(1);
subscriber.onSuccess(2);
})
.subscribe(_v -> latch.countDown());

latch.await(100, TimeUnit.MILLISECONDS);

assertThat(latch.getCount(), is(2L));
}


何に使う?


  • 一度しか値が流れないようなもの

  • Promise的な運用


single operatorとの使い分けは?

operatorの方のsingle()は2つ目以降の値が流れてきたらException吐いてくる.2つ目以降の値は流れてきてはいけないものならばoperatorの方のsingle()を使えばいいのかな?

public final class OperatorSingle<T> implements Operator<T, T> {

// ...

private static final class ParentSubscriber<T> extends Subscriber<T> {
// ...

// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/internal/operators/OperatorSingle.java#L106-L116
@Override
public void onNext(T value) {
if (isNonEmpty) {
hasTooManyElements = true;
child.onError(new IllegalArgumentException("Sequence contains too many elements"));
unsubscribe();
} else {
this.value = value;
isNonEmpty = true;
}
}

// ...
}
}


その他

toObservable()したらいつものrx.Observableが返ってくる.

public class Single<T> {

// ...

// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L248-L251
private static <T> Observable<T> asObservable(Single<T> t) {
return Observable.create(t.onSubscribe);
}

// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L1694-L1696
public final Observable<T> toObservable() {
return asObservable(this);
}
}


References