Edited at

RxJavaを使っている既存アプリに Kotlin Coroutinesを導入しよう

More than 1 year has passed since last update.

Kotlin FestのLTのネタです。(3分でどこまで話せるか謎です。)


きっかけ

RxJavaとCoroutinesについてJakeさんが以下のように言っていて、確かにsuspend functionのほうがわかりやすく書けると自分も感じています。

また既存のアプリの実装を思い浮かべたときにほとんどのRxJavaを使っているところがSingleまたはCompletableなどだったので、置き換えたほうが良さそうでは?と思いました。

個人的にはRxJavaのObservableで使っているところは、Coroutinesではそこまでスマートにはかけなさそうで、そのままでもいいかなという感じです。

そこですごく簡単なアプリで、移行するサンプルアプリを書いてみました。


例となるRxJavaのアプリ

簡単な例で、ViewModelがRxJavaを使ってApiクラスから情報を取得して、LiveDataに流すだけです。

class ViewModel(

private val api: Api = Api(),
private val scheduler: Scheduler = Schedulers.io()
) {
private val mutablePersons = MutableLiveData<List<Person>>()
val persons: LiveData<List<Person>> = mutablePersons

fun onCreate() {
api.fetchPersons()
.subscribeOn(scheduler)
.subscribe(mutablePersons::postValue)
}
}


置き換えする時の問題

大きなアプリで、置き換えていくときApiのクラスの実装を変えてしまうと既存のすべての実装を置き換える必要がありそうです :thinking:


kotlinx-coroutines-rx2

そこで、kotlinx-coroutines-rx2を使うと、以下のように変換できます。

import kotlinx.coroutines.experimental.rx2.await

class ViewModel(
private val api: Api = Api(),
private val coroutineContext: CoroutineContext = CommonPool
) {
private val mutablePersons = MutableLiveData<List<Person>>()
val persons: LiveData<List<Person>> = mutablePersons

fun onCreate() {
launch(coroutineContext) { // Kotlin Coroutines起動
// ** ここでRxJavaのSingleを中断関数に変換 **
val persons = api.fetchPersons().await()
mutablePersons.postValue(persons)
}
}
}

kotlinx-coroutines-rx2を使うことで、逆にsuspendメソッドからRxJavaのsingleにも置き換えができます。


サンプルアプリを用意

置き換えていけるのか検証の意味も込めて、以下のようにサンプルアプリを用意しました。

https://github.com/takahirom/rxjava-2-kotlion-coroutines

Source code
test
image

RxJavaのアプリ
Source code
Test
image

CoroutinesからRxJavaのSingleを使う例
Source code
Test
image

RxJavaからCoroutinesのsuspend functionを使う例
Source code
Test
image

Coroutinesを使う例
Source code
Test
image


テスト、エラーハンドリング、ライフサイクルを考慮した例

実際に書いていくときはいくつかのポイントを意識する必要があるのでそれを含んだサンプルも用意しています。

https://github.com/takahirom/rxjava-2-kotlion-coroutines/tree/master/5_coroutines_advanced/src

ポイント1: CoroutineContextを渡すことで、テスト時に切り替えられるようにして、テスト可能にする。

ポイント2: Jobを親として渡して、それをViewModel#onClearedでcancelすることで、使われない無駄な処理をさせないことができる。(これはなくても問題なく動作し、オプションと言えそう)

ポイント3: RxJavaのonErrorで処理をしていたのであれば、同じように、launch内でクラッシュしたときにハンドリングしてあげる必要がある。

ポイント4: suspend functionを使っているのでawaitを書く必要がない。Kotlinではこれが

他にもViewModelが外からいじれないようにフィールドを分けたり、SingleLiveDataを使って、画面回転時に知らせないようにするなど、工夫をしていますが、Coroutinesとは関係ないので、省略します。

class PersonsViewModel(

private val api: Api = Api(),
coroutineContext: CoroutineContext = CommonPool // ポイント1
) : ViewModel() {
private val compositeJob = Job() // ポイント2
private val mutablePersons = MutableLiveData<List<Person>>()
val persons: LiveData<List<Person>> = mutablePersons

private val singleError: SingleLiveData<Exception> = SingleLiveData()
val errors: LiveData<Exception> = singleError

init {
launch(coroutineContext, parent = compositeJob) { // ポイント2
try { // ポイント3
val persons = api.fetchPersons() // ポイント4
mutablePersons.postValue(persons)
} catch (e: Exception) {
singleError.postValue(e)
}
}
}

override fun onCleared() {
compositeJob.cancel()
}
}


RxJavaのSchedulerとスレッドを共有したい

RxJavaのioスレッドは1スレッドで動きますが、Coroutineでも独自のスレッドを作っていた場合、2スレッドになってしまい、マルチスレッドを考慮したプログラミングが必要になってしまいます。

そこでRxJavaのスケジューラーをasCoroutineDispatcher()を使うことでCoroutineからSchedulerのスレッドが利用できます。

AppCoroutineDispatchers

class AppCoroutineDispatchers(

val io: CoroutineContext = Schedulers.io().asCoroutineDispatcher(),
val computation: CoroutineContext = Schedulers.computation().asCoroutineDispatcher(),
val ui: CoroutineContext = UI
)

DaggerのProvider

  @Provides @Singleton 

fun provideDefaultCoroutineContext(): AppCoroutineDispatchers {
return AppCoroutineDispatchers()
}

Coroutineを使うクラス

class Hoge @Inject constructor(

private val coroutineDispatchers: AppCoroutineDispatchers
...
launch(coroutineDispatchers.io) {


まとめ

まだまだ知見やベストプラクティスを探っている状態ですが、見つけていきたいです。

こうしたらいいなどのコメントをTwitterなどでもいいのでお待ちしています :bow: