題目の通りです。
RxJava + RetrofitでAPIをポーリングしたい。
Retrofitで書いたSingleを返すgetStatusAPIがあり、
これを叩くと状態statusが取れます。
状態が変わるまで5秒間隔でポーリングしたいのですがどうすれば良い?
自己解決したので、メモのためここに書きます。
Observable.interval() で指定した間隔で購読します。
Observable.take() は一定時間経過後に購読を停止します。
Observable.takeWhile() は条件が真のあいだ購読を継続します。
キモは Observable.takeWhile() の位置。
doOnNext() より前に書くと、状態が変わったとき、doOnNext()がよばれなくなります。
hoge.java
private void polling(String checkoutId, int interval, int timeout) {
Disposable disposable = Observable.interval(interval, TimeUnit.SECONDS)
.take(timeout, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.flatMap(repeat -> API.getStatus(checkoutId).toObservable())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(response -> {
Log.d("DBG", "repeat polling.");
if (response.status.equals("COMPLETED")) {
Toast.makeText(this, "Checkout COMPLETED!", Toast.LENGTH_SHORT).show();
}
})
.doOnError(throwable -> {
Log.d("DBG", "polling has received error.");
Toast.makeText(this, "Checkout failed.", Toast.LENGTH_SHORT).show();
})
.doOnComplete(() -> {
Log.d("DBG", "stopped polling.");
})
.takeWhile(response -> response.status.equals("PENDING")) // subscribeの直前で購読停止を判定する
.subscribe(); //
compositeDisposable.add(disposable);
}