6
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RxKotlinで非同期並列処理をする

Last updated at Posted at 2019-05-12

非同期な処理を並列させて、それぞれが完了するまで待ち合わせます。onClick() を実行すると処理が始まります。

並列処理をただ待ち合わせる場合(戻り値不要)

mergeオペレータを使います。アイテムは何も流さずにonCompleteで処理を行います。

MainActivityViewModel.kt

class MainActivityViewModel : ViewModel(), LifecycleObserver {

    private val compositeDisposable = CompositeDisposable()

    fun onClick() {
        Observable.merge(task1(), task2())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({}, {}, { 
                Timber.d("onComplete") 
                // do something...
            })
            .addTo(compositeDisposable)
    }

    private fun task1(): Observable<Unit> {
        return Completable.fromAction {
            Timber.d("task 1 START")
            Thread.sleep(2000)
            Timber.d("task 1 END")
        }.subscribeOn(Schedulers.io()).toObservable()
    }

    private fun task2(): Observable<Unit> {
        return Completable.fromAction {
            Timber.d("task 2 START")
            Thread.sleep(3000)
            Timber.d("task 2 END")
        }.subscribeOn(Schedulers.io()).toObservable()
    }

    override fun onCleared() {
        super.onCleared()
        compositeDisposable.clear()
    }
}

実行結果

task 1 START
task 2 START
task 1 END
task 2 END
onComplete

並列処理の結果を使う場合

zipオペレータを使います。

MainActivityViewModel.kt
class MainActivityViewModel : ViewModel(), LifecycleObserver {

    private val compositeDisposable = CompositeDisposable()

    fun onClick() {
        Observables.zip(task1(), task2()) { t1, t2 ->  t1 + t2 }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe { 
                Timber.d("onComplete: %d", it)
                // do something...
            }
            .addTo(compositeDisposable)
    }

    private fun task1(): Observable<Int> {
        return Single.create<Int> { emitter ->
            Timber.d("task 1 START")
            Thread.sleep(2000)
            Timber.d("task 1 END")
            emitter.onSuccess(1)
        }.subscribeOn(Schedulers.io()).toObservable()
    }

    private fun task2(): Observable<Int> {
        return Single.create<Int> { emitter ->
            Timber.d("task 2 START")
            Thread.sleep(3000)
            Timber.d("task 2 END")
            emitter.onSuccess(2)
        }.subscribeOn(Schedulers.io()).toObservable()
    }

    override fun onCleared() {
        super.onCleared()
        compositeDisposable.clear()
    }
}

実行結果

task 1 START
task 2 START
task 1 END
task 2 END
onComplete: 3

RxKotlinでzipを使う場合は Observable ではなくて Observables を使うと BiFunction<> を省略することができます。

Kotlin Coroutines版もありますので参考にしてください。状況に応じてRxとcoroutinesを使い分けると良いでしょう。
Kotlin Coroutinesで非同期並列処理をする - Qiita

6
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?