takeUntil
in RxJava:
Observable<T> takeUntil(final Func1<? super T, Boolean> stopPredicate) {}
For example:
Observable.range(1, 10).takeUntil(i -> i > 5);
But takeUntil
in RxJs:
.takeUntil(Observable); // It's connectable Observable not stopPredicate
Immediately, I just wrote takeWhile()
for rx.js
rx.js:
observableProto.takeWhile = function(whileFunc) {
var source = this;
return new AnonymousObservable(function(observer) {
return source.subscribe(function(o) {
if (whileFunc(o)) {
observer.onNext(o);
} else {
observer.onCompleted();
}
}, function(e) {
observer.onError(e);
}, function() {
observer.onCompleted();
});
});
};
After, I found rx.js already has takeWhile()
, sad..