並列で走らせた非同期処理を、最終的には同期を取って次の処理に進みたい場合あるよね。
アプリだと、起動時のマスターデータの読み込みとか、他にはバナーとリストとインフィード広告の読み込みとか。
RxにはObservable.zipっていう機能があります。
複数のストリームが全部終わるの待てる仕組み。
引数に渡すObservable型が違っていても問題ないってのがナイス!
なので、マスターデータの待ち合わせなんかも楽ちん。
マスターデータの取得例
Observable.zip(
ApiUtil.getPrefecturesObservable(userDataManager.getmUserId()),
ApiUtil.getCitiesObservable(userDataManager.getmUserId()),
ApiUtil.getMiddleCategoriesObservable(userDataManager.getmUserId()),
new Func3<Prefectures, Cities, MiddleCategories, String>() {
@Override
public String call(Prefectures prefectures,
Cities cities,
MiddleCategories middleCategories) {
// データ更新に失敗した場合
if (prefectures.result == null || prefectures.result.size() == 0) {
return FAILED_GET_MASTER_DATA;
}
if (cities.result == null || cities.result.size() == 0) {
return FAILED_GET_MASTER_DATA;
}
if (middleCategories.result == null || middleCategories.result.size() == 0) {
return FAILED_GET_MASTER_DATA;
}
// 各種データのDB保存(データ取得が全て正常な場合のみ保存する)
new PrefectureDao(StartActivity.this).init(prefectures.result);
new CityDao(StartActivity.this).init(cities.result);
new MiddleCategoryDao(StartActivity.this).init(middleCategories.result);
new ArticleViewHistoryDao(StartActivity.this).init();
return SUCCESS_GET_MASTER_DATA;
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.compose(HogeActivity.this.<String>bindToLifecycle())
.subscribe(new ApiObserver<String>(StartActivity.this){
@Override
public void onError(Throwable e) {
handleError(e);
// エラー処理
}
@Override
public void onNext(String result) {
if (result.equals(SUCCESS_GET_MASTER_DATA)) {
// データ完全時の処理
} else {
// データ不完全時の処理
// 全体のやり直しな感じ
}
}
});
シリアルに投げてた通信を並列でやれるから、時間短縮になるのはもちろん、途中で失敗した場合はロールバックみたいな、トランザクション的な書き方も簡単。