N予備校 Androidチームでテックリードをしている鎌田です。
N予備校 Androidチームでは、非同期処理を RxJava から Kotlin Flow に移行しました。
本記事では、コード例を交えてこれまでの実装からどのように Kotlin Flow に移行したかの経緯、移行にあたってつまづいたポイントについてまとめます。
また、Kotlin Flow の細かい解説に関してはしませんのでご承知ください。
背景
これまでN予備校 Androidでは、非同期処理ライブラリとして RxJava 3 を使用してきましたが、以下の理由により移行に踏み切りました。
- RxJava の学習コストが高く、Compose と Flow の相性も良い
- Kotlin Coroutine, Flow は Android 開発の標準技術となっているため、参考にできる資料が増えてきている
- RxJava はバージョンアップの度に大幅なリファクタリングを行う必要があるため、RxJava への依存を解消することでメンテナンスコストが減る
- 詳細は、Androidアプリの非同期処理ライブラリをRxJava3にバージョンアップしましたを参照してみてください
- RxJava を使用したことがある人はどんどん減っており、採用活動においても不利になる
モジュール構成図
移行方法について説明する前に、N予備校 Androidのモジュール構成について簡単に示します。
モジュール構成図は以下のようになっております。
モジュール構成図の詳細は以下の記事に記載していますので、興味がありましたら参照してみてください。
6 年にわたる Android アプリの開発環境改善への取り組み
Kotlin Flow の学習
チーム内で Kotlin Flow に関する知見が無かったため、Google Codelab の教材を使って Kotlin Flow の学習をしました。
定例ミーティングの後の時間を使って 1 日 20 分ほどかけて学習しました。
コルーチンのライブラリの追加
各モジュールにコルーチンのライブラリを追加しました。
dependencies {
...
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4"
...
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.6.4"
}
非同期処理を Kotlin Flow に移行
モジュール構成図に基づいて、極力既存の処理への影響を抑えるために data レイヤーから移行しました。
Repository を Kotlin Flow に移行
各 Repository クラスを Kotlin Flow に移行しました。
UseCase を Kotlin Flow に置き換えるにあたって、Flow を流すためのメソッドを別途作成しておきました。
interface ExampleRepositoryContract {
fun getExampleData(): Single<ExampleData>
// 新規追加
suspend fun getExampleDataFlow(): Flow<ExampleData>
}
class ExampleRepository(...) : ExampleRepositoryContract {
override fun getExampleData(): Single<ExampleData> {
...
}
// 新規追加
override suspend fun getExampleDataFlow(): Flow<ExampleData> {
...
}
}
UseCase を Kotlin Flow に移行
各 UseCase クラスを Kotlin Flow に移行しました。
例えば以下の UseCase クラスがあるとします。
class FetchExampleData(
private val exampleRepository: ExampleRepositoryContract,
private val exampleScreen: ExampleScreenContract,
private val viewModel: ExampleViewModelContract
) : ObservableUseCase<ExampleData>() {
override val observable: Observable<ExampleData>
get() = exampleScreen.onResume
.flatMapSingle {
exampleRepository.getExampleData()
.onErrorResumeNext {
observable
}
}
override fun onNext(t: ExampleData) {
viewModel.setExampleData(t)
}
}
処理の流れとして、observable
に定義しているexampleScreen.onResume
が発火されると、exampleRepository#getExampleData
を呼び出します。
成功したらonNext
内の処理を実行し、失敗したらonErrorResumeNext
内の処理を実行してonNext
内の処理を実行しません。
上記を Kotlin Flow に置き換えるため、まず UseCase を発火させるための Flow を定義しました。
具体的には以下の通り置き換えました。
interface ExampleScreenContract {
val onResume: Observable<Unit>
// 新規追加
val onResumeFlow: Flow<Unit>
}
class ExampleScreenPresenter(...) : ExampleScreenContract {
private val onResumeSubject = PublishSubject.create<Unit>()
val onResume: Observable<Unit>
get() = onResumeSubject.hide()
// 新規追加
private val _onResume = MutableSharedFlow<Unit>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
override val onResumeFlow: Flow<Unit>
get() = _onResume.asSharedFlow()
fun onResume() {
onResumeSubject.onNext(Unit)
// 新規追加
_onResume.tryEmit(Unit)
}
}
ポイントとしては、いきなり置き換えるのはリスクが大きいため、既存の処理は残しつつ新規追加することです。
実際に値を流す(onNext
を呼び出す)には、emit
を使用しますが suspend 関数となり修正範囲が広がるため、suspend 関数でないtryEmit
を使用します。
tryEmit
を使用する場合、そのままだと Flow が発火されない問題があったため、Subject の種類によってextraBufferCapacity
またはreplay
、onBufferOverflow
を設定します。
それぞれ以下のように置き換えました。
RxJava | Kotlin Flow |
---|---|
PublishSubject.create | MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) |
BehaviorSubject.create | MutableSharedFlow(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) |
BehaviorSubject.createDefault | MutableStateFlow |
ReplaySubject.create | MutableSharedFlow(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) |
ただし、将来的には suspend 関数を一般化させたいため、各 UseCase クラスを Kotlin Flow に移行した後でemit
に切り替えます。
MutableSharedFlow
を Flow に変換する際はasSharedFlow()
、MutableStateFlow
を Flow に変換する際はasStateFlow()
を使用します。
呼び出し側で Mutable に戻せなくなり保守性が上がるためです。
また、MutableStateFlow
の値を更新する際、value
を使用するとコルーチンスコープで囲む必要がなくなります。
val _stateFlow = MutableStateFlow("a")
val stateFlow = _stateFlow.asStateFlow()
_stateFlow.value = "b"
println(stateFlow.value) // b
次に、UseCase は以下のように置き換えました。
@OptIn(FlowPreview::class)
class FetchExampleDataFlow(
private val exampleRepository: ExampleRepositoryContract,
private val exampleScreen: ExampleScreenContract,
private val viewModel: ExampleViewModelContract
) : FlowUseCase<ExampleData>() {
override val flow: Flow<ExampleData>
get() = exampleScreen.onResumeFlow
.flatMapConcat {
exampleRepository.getExampleDataFlow()
.catch {
emitAll(flow)
}
}
override suspend fun onEach(t: ExampleData) {
viewModel.setExampleData(t)
}
}
こちらもいきなり置き換えるのはリスクが大きいため、既存の UseCase クラスを残しつつ新規追加します。
RxJava の各種オペレータを Kotlin Flow に移行する際の置き換える方法については後ほど解説します。
基底クラスとしてFlowUseCase
クラスも作成しました。
abstract class FlowUseCase<T : Any> {
abstract val flow: Flow<T>
/**
* [flow]で流れてきた値を使ってメインスレッドで行う処理をここに記載する
*/
abstract suspend fun onEach(t: T)
suspend fun build(coroutineScope: CoroutineScope) {
flow
.onEach {
onEach(it)
}
.launchIn(coroutineScope)
}
}
次に、各 UseCase クラスをlaunchIn
を使って収集しました。
private val flowUseCases: List<FlowUseCase>
get() = listOf(
FetchExampleData(exampleRepository, exampleScreen, viewModel)
)
...
fun onViewCreated() {
fragment.viewLifecycleOwner.lifecycleScope.launch {
fragment.viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
flowUseCases.forEach {
it.build(this)
}
}
}
}
ライフサイクル対応収集を行うため、viewLifecycleOwner.repeatOnLifecycle
を使用します。
viewLifecycleOwner.repeatOnLifecycle
に指定する値については、UseCase をどのライフサイクルに紐づけるかによります。
詳細はLifecycle.State のリファレンスを参照ください。
UseCase のテストコードは以下のように作成しました。
@OptIn(ExperimentalCoroutinesApi::class)
class FetchExampleDataFlowTest {
private val exampleRepository: ExampleRepositoryContract = mock()
private val exampleScreen: ExampleScreenContract = mock()
private val viewModel: ExampleViewModelContract = mock()
private val useCase = FetchExampleDataFlow(exampleRepository, exampleScreen, viewModel)
@BeforeTest
fun setUp() {
whenever(exampleScreen.onResumeFlow).thenReturn(flowOf(Unit))
}
@Test
fun `ExampleDataの取得に成功したらViewModelに設定されること`() = runTest {
val exampleData = ExampleData()
whenever(exampleRepository.getExampleDataFlow()).thenReturn(flowOf(exampleData))
useCase.build()
verify(viewModel).setExampleData(exampleData)
}
@Test
fun `ExampleDataの取得に失敗したらViewModelに設定されないこと`() = runTest {
val throwable = Throwable()
whenever(exampleRepository.getExampleDataFlow()).thenReturn(flow { throw throwable })
useCase.build()
verify(viewModel, never()).setExampleData(any())
}
}
...
suspend fun <T : Any> FlowUseCase<T>.build() = flow.onEach { onEach(it) }.collect()
ポイントとしては、各テスト関数内で suspend 関数を呼ぶため、runTest
ブロックで囲むことです。
また、テスト用ライブラリとして主に以下を使用しています。
N予備校 Androidにおけるテスト用ライブラリの導入話もありますので、興味がありましたら参照してみてください。
RxJava の各種オペレータを Kotlin Flow に移行する際の置き換え方法
基本的には、Migration.ktに基づいて置き換えました。
主に以下のように置き換えました。
RxJava | Kotlin Flow |
---|---|
Observable.just(...) Single.just(...) Flowable.just(...) |
flowOf(...) |
Observable.just(Unit).flatMap { ... } | flow { emitAll(...) } |
Observable.error(...) Single.error(...) |
flow { throw ... } |
onErrorResumeNext { observable } | catch { emitAll(flow) } |
onErrorResumeNext { Single.never() } onErrorResumeNext { Observable.never() } |
catch {} |
Single.never() Observable.empty() Observable.never() |
emptyFlow() |
throttleLast(200, TimeUnit.MILLISECONDS) | sample(200.milliseconds) |
onBackPressureDrop() | conflate() |
delay(500, TimeUnit.MILLISECONDS) | delay(500.milliseconds) |
TestScheduler#advanceTimeTo(20, TimeUnit.SECONDS) | 基本的には関数自体を runTest ブロック内で実行し、コルーチンの遅延がすべて自動的にスキップされるため、遅延しているかどうかをテストする必要がなければなくてもよいです。 delay で遅延しているかどうかをテストするためには、runTest ブロック内で currentTime を使うことで内部時間を計測できます。 assertEquals(20.seconds.inWholeMilliseconds, currentTime) |
上記以外のオペレータを置き換える必要がある場合、kotlinx.coroutinesのissueを参照してみてください。
必要に応じて、拡張関数として作成しておくと便利です。
一例として、以下は Rx の concat を Kotlin Flow に置き換えたものです。
/**
* Rx の concat を Kotlin Flowに置き換えたもの。
*/
@OptIn(FlowPreview::class)
fun <T> concat(vararg flows: Flow<T>) = flows.asFlow().flattenConcat()
上記の手順で置き換えできたら、古い UseCase クラスを削除し、参照されなくなった RxJava 関連の処理を削除します。
interface ExampleRepositoryContract {
suspend fun getExampleDataFlow(): Flow<ExampleData>
}
class ExampleRepository(...) : ExampleRepositoryContract {
override suspend fun getExampleDataFlow(): Flow<ExampleData> {
...
}
}
interface ExampleScreenContract {
val onResumeFlow: Flow<Unit>
}
class ExampleScreenPresenter(...) : ExampleScreenContract {
private val _onResume = MutableSharedFlow<Unit>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
override val onResumeFlow: Flow<Unit>
get() = _onResume.asSharedFlow()
fun onResume() {
_onResume.tryEmit(Unit)
}
}
最後に置き換え後の UseCase クラス名や各種メソッド名から語尾に付いているFlow
を削除しました。
-
FetchExampleDataFlow
→FetchExampleData
-
FetchExampleDataFlowTest
→FetchExampleDataTest
-
ExampleRepositoryContract#getExampleDataFlow
→ExampleRepositoryContract#getExampleData
-
ExampleScreenContract.onResumeFlow
→ExampleScreenContract.onResume
tryEmit を emit に置き換える
Flow を発火させる際、影響範囲を最小限にするためtryEmit
を使用していましたが、プロジェクト全体で suspend 関数を一般化させたいため、emit
に切り替えました。
具体的には、Flow 型のプロパティおよびそれをemit
するメソッドを ViewModel クラスに移行しました。
例えばExampleScreenContract.onResume
とそれをemit
するメソッドをExampleViewModelContract
に移行した例を示します。
interface ExampleViewModelContract {
...
val onResume: Flow<Unit>
}
class ExampleViewModel : ViewModel(), ExampleViewModelContract {
...
private val _onResume = MutableSharedFlow<Unit>()
override val onResume: Flow<Unit>
get() = _onResume.asSharedFlow()
fun onResume() = viewModelScope.launch {
_onResume.emit(Unit)
}
}
emit
は suspend 関数であるため、呼ぶためにはコルーチンスコープで囲む必要があります。
ViewModel を継承したクラスではviewModelScope
が使えます。
emit
に切り替えることで、extraBufferCapacity
を指定しなくてもユースケース側で購読できるようになるため、指定を外しました。
また、BufferOverflow.DROP_OLDEST
はextraBufferCapacity
かreplay
を 1 以上にしないと意味がないため、こちらも指定を外しました。
class ExampleScreenPresenter(
private val viewModel: ExampleViewModel
...
) {
fun onResume() {
viewModel.onResume()
}
...
}
@OptIn(FlowPreview::class)
class FetchExampleData(
private val exampleRepository: ExampleRepositoryContract,
private val viewModel: ExampleViewModelContract
) : FlowUseCase<ExampleData>() {
override val flow: Flow<ExampleData>
get() = viewModel.onResume
.flatMapConcat {
exampleRepository.getExampleData()
.catch {
emitAll(flow)
}
}
override suspend fun onEach(t: ExampleData) {
viewModel.setExampleData(t)
}
}
ExampleScreenContract
は不要になったため削除しました。
つまずいたポイント
Flow に値を流した際、該当の UseCase が起動しない
Flow に値を流した際、該当の UseCase がまだ収集されていなくて起動しないケースがありました。
その場合、viewLifecycleOwner.repeatOnLifecycle
に指定する値を変更するか、MutableSharedFlow
にreplay = 1
を設定して対処しました。
エラーハンドリング不要な UseCase にて処理失敗時、アプリがクラッシュしてしまう
エラーハンドリング不要な UseCase にて処理失敗時、アプリがクラッシュしてしまうケースがありました。
その場合、空のcatch
を実装してエラーを握りつぶすようにしました。
flow {
...
}.catch {}
まとめ
本記事では、N予備校 Androidチームにてコード例を交えてこれまでの実装からどのように Kotlin Flow に移行したかの経緯、移行にあたってつまづいたポイントについてまとめました。
UseCase クラス数がかなり多いため、全ての UseCase クラスを対応するのに半年ほどかかりましたが、無事に Rx 系のライブラリを削除でき、他の開発の手を止めることなく進めることができました。
極力既存の処理への影響を少なくため、1 つずつ置き換えることがポイントです。
kotlinx-coroutines-rx 等の導入も検討しましたが、挙動が既存の動きと変わることがあったため導入しませんでした。
RxJava から Kotlin Flow に移行する際の助けになれば幸いです。