非同期な処理を並列させて、それぞれが完了するまで待ち合わせます。onClick()
を実行すると処理が始まります。
並列処理をただ待ち合わせる場合(戻り値不要)
mergeオペレータを使います。アイテムは何も流さずにonCompleteで処理を行います。
MainActivityViewModel.kt
class MainActivityViewModel : ViewModel(), LifecycleObserver {
private val compositeDisposable = CompositeDisposable()
fun onClick() {
Observable.merge(task1(), task2())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({}, {}, {
Timber.d("onComplete")
// do something...
})
.addTo(compositeDisposable)
}
private fun task1(): Observable<Unit> {
return Completable.fromAction {
Timber.d("task 1 START")
Thread.sleep(2000)
Timber.d("task 1 END")
}.subscribeOn(Schedulers.io()).toObservable()
}
private fun task2(): Observable<Unit> {
return Completable.fromAction {
Timber.d("task 2 START")
Thread.sleep(3000)
Timber.d("task 2 END")
}.subscribeOn(Schedulers.io()).toObservable()
}
override fun onCleared() {
super.onCleared()
compositeDisposable.clear()
}
}
実行結果
task 1 START
task 2 START
task 1 END
task 2 END
onComplete
並列処理の結果を使う場合
zipオペレータを使います。
MainActivityViewModel.kt
class MainActivityViewModel : ViewModel(), LifecycleObserver {
private val compositeDisposable = CompositeDisposable()
fun onClick() {
Observables.zip(task1(), task2()) { t1, t2 -> t1 + t2 }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Timber.d("onComplete: %d", it)
// do something...
}
.addTo(compositeDisposable)
}
private fun task1(): Observable<Int> {
return Single.create<Int> { emitter ->
Timber.d("task 1 START")
Thread.sleep(2000)
Timber.d("task 1 END")
emitter.onSuccess(1)
}.subscribeOn(Schedulers.io()).toObservable()
}
private fun task2(): Observable<Int> {
return Single.create<Int> { emitter ->
Timber.d("task 2 START")
Thread.sleep(3000)
Timber.d("task 2 END")
emitter.onSuccess(2)
}.subscribeOn(Schedulers.io()).toObservable()
}
override fun onCleared() {
super.onCleared()
compositeDisposable.clear()
}
}
実行結果
task 1 START
task 2 START
task 1 END
task 2 END
onComplete: 3
RxKotlinでzipを使う場合は Observable
ではなくて Observables
を使うと BiFunction<>
を省略することができます。
Kotlin Coroutines版もありますので参考にしてください。状況に応じてRxとcoroutinesを使い分けると良いでしょう。
Kotlin Coroutinesで非同期並列処理をする - Qiita