TR;DL
-
rx.Single
を使えば値が1回しか流れてこないよ -
rxSingle
は何回も値を流そうとしても何もおきないよ -
single()
operatorは2つ目以降の値が流れてきたらException吐くよ
rx.Single<T>
rx.Single<T>がRxJava 1.1.0でexperimentalからbetaに昇格した.rx.Observable
の代わりに使う.APIもだいたい似たような感じだが,大きく違うのがsubscribe()
.
public class Single<T> {
// ...
// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L1407-L1430
public final Subscription subscribe(final Action1<? super T> onSuccess) {
// ...
}
// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L1451-L1477
public final Subscription subscribe(final Action1<? super T> onSuccess, final Action1<Throwable> onError) {
// ...
}
// ...
}
rx.Single
には第3引数にonComplete
をとるsubscribe()
メソッドが生えていない.
rx.SingleSubscriber<T>
従来のrx.Subscriber<T>
にはSubscription
のメソッドに加え,rx.Observer<T>
の3メソッドが用意されている.
// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Subscriber.java
public abstract class Subscriber<T> implements Observer<T>, Subscription {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
// ...
}
一方,rx.SingleSubscriber<T>
にはSubscription
のものを除けばメソッドが2つしか存在しない.
// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/SingleSubscriber.java
public abstract class SingleSubscriber<T> implements Subscription {
public abstract void onSuccess(T value);
public abstract void onError(Throwable error);
// ...
}
onSuccess()は何回呼べる?
1回(2回目以降も実行はできるけど何も起きない).
@Test
public void testSingle() throws Exception {
final CountDownLatch latch = new CountDownLatch(3);
Single.create(
subscriber -> {
subscriber.onSuccess(0);
subscriber.onSuccess(1);
subscriber.onSuccess(2);
})
.subscribe(_v -> latch.countDown());
latch.await(100, TimeUnit.MILLISECONDS);
assertThat(latch.getCount(), is(2L));
}
何に使う?
- 一度しか値が流れないようなもの
- Promise的な運用
single operatorとの使い分けは?
operatorの方のsingle()
は2つ目以降の値が流れてきたらException
吐いてくる.2つ目以降の値は流れてきてはいけないものならばoperatorの方のsingle()
を使えばいいのかな?
public final class OperatorSingle<T> implements Operator<T, T> {
// ...
private static final class ParentSubscriber<T> extends Subscriber<T> {
// ...
// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/internal/operators/OperatorSingle.java#L106-L116
@Override
public void onNext(T value) {
if (isNonEmpty) {
hasTooManyElements = true;
child.onError(new IllegalArgumentException("Sequence contains too many elements"));
unsubscribe();
} else {
this.value = value;
isNonEmpty = true;
}
}
// ...
}
}
その他
toObservable()
したらいつものrx.Observable
が返ってくる.
public class Single<T> {
// ...
// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L248-L251
private static <T> Observable<T> asObservable(Single<T> t) {
return Observable.create(t.onSubscribe);
}
// https://github.com/ReactiveX/RxJava/blob/1.1.0/src/main/java/rx/Single.java#L1694-L1696
public final Observable<T> toObservable() {
return asObservable(this);
}
}