20
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

N予備校 Android アプリで RxJava から Kotlin Flow に移行した話

Last updated at Posted at 2023-04-04

N予備校 Androidチームでテックリードをしている鎌田です。

N予備校 Androidチームでは、非同期処理を RxJava から Kotlin Flow に移行しました。

本記事では、コード例を交えてこれまでの実装からどのように Kotlin Flow に移行したかの経緯、移行にあたってつまづいたポイントについてまとめます。

また、Kotlin Flow の細かい解説に関してはしませんのでご承知ください。

背景

これまでN予備校 Androidでは、非同期処理ライブラリとして RxJava 3 を使用してきましたが、以下の理由により移行に踏み切りました。

  • RxJava の学習コストが高く、Compose と Flow の相性も良い
  • Kotlin Coroutine, Flow は Android 開発の標準技術となっているため、参考にできる資料が増えてきている
  • RxJava はバージョンアップの度に大幅なリファクタリングを行う必要があるため、RxJava への依存を解消することでメンテナンスコストが減る
  • RxJava を使用したことがある人はどんどん減っており、採用活動においても不利になる

モジュール構成図

移行方法について説明する前に、N予備校 Androidのモジュール構成について簡単に示します。
モジュール構成図は以下のようになっております。

module_configuration_diagram.png

モジュール構成図の詳細は以下の記事に記載していますので、興味がありましたら参照してみてください。

6 年にわたる Android アプリの開発環境改善への取り組み

Kotlin Flow の学習

チーム内で Kotlin Flow に関する知見が無かったため、Google Codelab の教材を使って Kotlin Flow の学習をしました。

定例ミーティングの後の時間を使って 1 日 20 分ほどかけて学習しました。

コルーチンのライブラリの追加

各モジュールにコルーチンのライブラリを追加しました。

build.gradle
dependencies {
    ...
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4"
 
    ...
    testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.6.4"
}

非同期処理を Kotlin Flow に移行

モジュール構成図に基づいて、極力既存の処理への影響を抑えるために data レイヤーから移行しました。

Repository を Kotlin Flow に移行

各 Repository クラスを Kotlin Flow に移行しました。

UseCase を Kotlin Flow に置き換えるにあたって、Flow を流すためのメソッドを別途作成しておきました。

ExampleRepositoryContract.kt
interface ExampleRepositoryContract {
    fun getExampleData(): Single<ExampleData>
    // 新規追加
    suspend fun getExampleDataFlow(): Flow<ExampleData>
}
ExampleRepository.kt
class ExampleRepository(...) : ExampleRepositoryContract {
    override fun getExampleData(): Single<ExampleData> {
        ...
    }
    // 新規追加
    override suspend fun getExampleDataFlow(): Flow<ExampleData> {
        ...
    }
}

UseCase を Kotlin Flow に移行

各 UseCase クラスを Kotlin Flow に移行しました。

例えば以下の UseCase クラスがあるとします。

FetchExampleData.kt
class FetchExampleData(
    private val exampleRepository: ExampleRepositoryContract,
    private val exampleScreen: ExampleScreenContract,
    private val viewModel: ExampleViewModelContract
) : ObservableUseCase<ExampleData>() {
    override val observable: Observable<ExampleData>
        get() = exampleScreen.onResume
            .flatMapSingle {
                exampleRepository.getExampleData()
                    .onErrorResumeNext {
                        observable
                    }
            }

    override fun onNext(t: ExampleData) {
        viewModel.setExampleData(t)
    }
}

処理の流れとして、observableに定義しているexampleScreen.onResumeが発火されると、exampleRepository#getExampleDataを呼び出します。
成功したらonNext内の処理を実行し、失敗したらonErrorResumeNext内の処理を実行してonNext内の処理を実行しません。

上記を Kotlin Flow に置き換えるため、まず UseCase を発火させるための Flow を定義しました。

具体的には以下の通り置き換えました。

ExampleScreenContract.kt
interface ExampleScreenContract {
    val onResume: Observable<Unit>
    // 新規追加
    val onResumeFlow: Flow<Unit>
}
ExampleScreenPresenter.kt
class ExampleScreenPresenter(...) : ExampleScreenContract {
    private val onResumeSubject = PublishSubject.create<Unit>()
    val onResume: Observable<Unit>
        get() = onResumeSubject.hide()

    // 新規追加
    private val _onResume = MutableSharedFlow<Unit>(
        extraBufferCapacity = 1,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    override val onResumeFlow: Flow<Unit>
        get() = _onResume.asSharedFlow()
 
    fun onResume() {
        onResumeSubject.onNext(Unit)
        // 新規追加
        _onResume.tryEmit(Unit)
    }
}

ポイントとしては、いきなり置き換えるのはリスクが大きいため、既存の処理は残しつつ新規追加することです。

実際に値を流す(onNextを呼び出す)には、emitを使用しますが suspend 関数となり修正範囲が広がるため、suspend 関数でないtryEmitを使用します。

tryEmitを使用する場合、そのままだと Flow が発火されない問題があったため、Subject の種類によってextraBufferCapacityまたはreplayonBufferOverflowを設定します。

それぞれ以下のように置き換えました。

RxJava Kotlin Flow
PublishSubject.create MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
BehaviorSubject.create MutableSharedFlow(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
BehaviorSubject.createDefault MutableStateFlow
ReplaySubject.create MutableSharedFlow(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)

ただし、将来的には suspend 関数を一般化させたいため、各 UseCase クラスを Kotlin Flow に移行した後でemitに切り替えます。

MutableSharedFlowを Flow に変換する際はasSharedFlow()MutableStateFlowを Flow に変換する際はasStateFlow()を使用します。

呼び出し側で Mutable に戻せなくなり保守性が上がるためです。

また、MutableStateFlowの値を更新する際、valueを使用するとコルーチンスコープで囲む必要がなくなります。

val _stateFlow = MutableStateFlow("a")
val stateFlow = _stateFlow.asStateFlow()

_stateFlow.value = "b"
println(stateFlow.value) // b

次に、UseCase は以下のように置き換えました。

FetchExampleDataFlow.kt
@OptIn(FlowPreview::class)
class FetchExampleDataFlow(
    private val exampleRepository: ExampleRepositoryContract,
    private val exampleScreen: ExampleScreenContract,
    private val viewModel: ExampleViewModelContract
) : FlowUseCase<ExampleData>() {
    override val flow: Flow<ExampleData>
        get() = exampleScreen.onResumeFlow
            .flatMapConcat {
                exampleRepository.getExampleDataFlow()
                    .catch {
                        emitAll(flow)
                    }
            }

    override suspend fun onEach(t: ExampleData) {
        viewModel.setExampleData(t)
    }
}

こちらもいきなり置き換えるのはリスクが大きいため、既存の UseCase クラスを残しつつ新規追加します。

RxJava の各種オペレータを Kotlin Flow に移行する際の置き換える方法については後ほど解説します。

基底クラスとしてFlowUseCaseクラスも作成しました。

FlowUseCase.kt
abstract class FlowUseCase<T : Any> {
    abstract val flow: Flow<T>

    /**
     * [flow]で流れてきた値を使ってメインスレッドで行う処理をここに記載する
     */
    abstract suspend fun onEach(t: T)

    suspend fun build(coroutineScope: CoroutineScope) {
        flow
            .onEach {
                onEach(it)
            }
            .launchIn(coroutineScope)
    }
}

次に、各 UseCase クラスをlaunchInを使って収集しました。

private val flowUseCases: List<FlowUseCase>
    get() = listOf(
        FetchExampleData(exampleRepository, exampleScreen, viewModel)
    )
...
fun onViewCreated() {
    fragment.viewLifecycleOwner.lifecycleScope.launch {
        fragment.viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
            flowUseCases.forEach {
                it.build(this)
            }
        }
    }
}

ライフサイクル対応収集を行うため、viewLifecycleOwner.repeatOnLifecycleを使用します。

viewLifecycleOwner.repeatOnLifecycleに指定する値については、UseCase をどのライフサイクルに紐づけるかによります。

詳細はLifecycle.State のリファレンスを参照ください。

UseCase のテストコードは以下のように作成しました。

@OptIn(ExperimentalCoroutinesApi::class)
class FetchExampleDataFlowTest {
    private val exampleRepository: ExampleRepositoryContract = mock()
    private val exampleScreen: ExampleScreenContract = mock()
    private val viewModel: ExampleViewModelContract = mock()

    private val useCase = FetchExampleDataFlow(exampleRepository, exampleScreen, viewModel)

    @BeforeTest
    fun setUp() {
        whenever(exampleScreen.onResumeFlow).thenReturn(flowOf(Unit))
    }

    @Test
    fun `ExampleDataの取得に成功したらViewModelに設定されること`() = runTest {
        val exampleData = ExampleData()
        whenever(exampleRepository.getExampleDataFlow()).thenReturn(flowOf(exampleData))

        useCase.build()

        verify(viewModel).setExampleData(exampleData)
    }

    @Test
    fun `ExampleDataの取得に失敗したらViewModelに設定されないこと`() = runTest {
        val throwable = Throwable()
        whenever(exampleRepository.getExampleDataFlow()).thenReturn(flow { throw throwable })

        useCase.build()

        verify(viewModel, never()).setExampleData(any())
    }
}
...
suspend fun <T : Any> FlowUseCase<T>.build() = flow.onEach { onEach(it) }.collect()

ポイントとしては、各テスト関数内で suspend 関数を呼ぶため、runTestブロックで囲むことです。

また、テスト用ライブラリとして主に以下を使用しています。

N予備校 Androidにおけるテスト用ライブラリの導入話もありますので、興味がありましたら参照してみてください。

RxJava の各種オペレータを Kotlin Flow に移行する際の置き換え方法

基本的には、Migration.ktに基づいて置き換えました。

主に以下のように置き換えました。

RxJava Kotlin Flow
Observable.just(...)
Single.just(...)
Flowable.just(...)
flowOf(...)
Observable.just(Unit).flatMap { ... } flow { emitAll(...) }
Observable.error(...)
Single.error(...)
flow { throw ... }
onErrorResumeNext { observable } catch { emitAll(flow) }
onErrorResumeNext { Single.never() }
onErrorResumeNext { Observable.never() }
catch {}
Single.never()
Observable.empty()
Observable.never()
emptyFlow()
throttleLast(200, TimeUnit.MILLISECONDS) sample(200.milliseconds)
onBackPressureDrop() conflate()
delay(500, TimeUnit.MILLISECONDS) delay(500.milliseconds)
TestScheduler#advanceTimeTo(20, TimeUnit.SECONDS) 基本的には関数自体を runTest ブロック内で実行し、コルーチンの遅延がすべて自動的にスキップされるため、遅延しているかどうかをテストする必要がなければなくてもよいです。
delay で遅延しているかどうかをテストするためには、runTest ブロック内で currentTime を使うことで内部時間を計測できます。
assertEquals(20.seconds.inWholeMilliseconds, currentTime)

上記以外のオペレータを置き換える必要がある場合、kotlinx.coroutinesのissueを参照してみてください。

必要に応じて、拡張関数として作成しておくと便利です。

一例として、以下は Rx の concat を Kotlin Flow に置き換えたものです。

FlowExtension.kt
/**
 * Rx の concat を Kotlin Flowに置き換えたもの。
 */
@OptIn(FlowPreview::class)
fun <T> concat(vararg flows: Flow<T>) = flows.asFlow().flattenConcat()

上記の手順で置き換えできたら、古い UseCase クラスを削除し、参照されなくなった RxJava 関連の処理を削除します。

ExampleRepositoryContract.kt
interface ExampleRepositoryContract {
    suspend fun getExampleDataFlow(): Flow<ExampleData>
}
ExampleRepository.kt
class ExampleRepository(...) : ExampleRepositoryContract {
    override suspend fun getExampleDataFlow(): Flow<ExampleData> {
        ...
    }
}
ExampleScreenContract.kt
interface ExampleScreenContract {
    val onResumeFlow: Flow<Unit>
}
ExampleScreenPresenter.kt
class ExampleScreenPresenter(...) : ExampleScreenContract {
    private val _onResume = MutableSharedFlow<Unit>(
        extraBufferCapacity = 1,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    override val onResumeFlow: Flow<Unit>
        get() = _onResume.asSharedFlow()
 
    fun onResume() {
        _onResume.tryEmit(Unit)
    }
}

最後に置き換え後の UseCase クラス名や各種メソッド名から語尾に付いているFlowを削除しました。

  • FetchExampleDataFlowFetchExampleData
  • FetchExampleDataFlowTestFetchExampleDataTest
  • ExampleRepositoryContract#getExampleDataFlowExampleRepositoryContract#getExampleData
  • ExampleScreenContract.onResumeFlowExampleScreenContract.onResume

tryEmit を emit に置き換える

Flow を発火させる際、影響範囲を最小限にするためtryEmitを使用していましたが、プロジェクト全体で suspend 関数を一般化させたいため、emitに切り替えました。

具体的には、Flow 型のプロパティおよびそれをemitするメソッドを ViewModel クラスに移行しました。

例えばExampleScreenContract.onResumeとそれをemitするメソッドをExampleViewModelContractに移行した例を示します。

ExampleViewModelContract.kt
interface ExampleViewModelContract {
    ...
    val onResume: Flow<Unit>
}
ExampleViewModel.kt
class ExampleViewModel : ViewModel(), ExampleViewModelContract {
    ...
    private val _onResume = MutableSharedFlow<Unit>()
    override val onResume: Flow<Unit>
        get() = _onResume.asSharedFlow()
 
    fun onResume() = viewModelScope.launch {
        _onResume.emit(Unit)
    }
}

emitは suspend 関数であるため、呼ぶためにはコルーチンスコープで囲む必要があります。

ViewModel を継承したクラスではviewModelScopeが使えます。

emitに切り替えることで、extraBufferCapacityを指定しなくてもユースケース側で購読できるようになるため、指定を外しました。

また、BufferOverflow.DROP_OLDESTextraBufferCapacityreplayを 1 以上にしないと意味がないため、こちらも指定を外しました。

ExampleScreenPresenter.kt
class ExampleScreenPresenter(
    private val viewModel: ExampleViewModel
    ...
) {
    fun onResume() {
        viewModel.onResume()
    }
    ...
}
FetchExampleData.kt
@OptIn(FlowPreview::class)
class FetchExampleData(
    private val exampleRepository: ExampleRepositoryContract,
    private val viewModel: ExampleViewModelContract
) : FlowUseCase<ExampleData>() {
    override val flow: Flow<ExampleData>
        get() = viewModel.onResume
            .flatMapConcat {
                exampleRepository.getExampleData()
                    .catch {
                        emitAll(flow)
                    }
            }

    override suspend fun onEach(t: ExampleData) {
        viewModel.setExampleData(t)
    }
}

ExampleScreenContractは不要になったため削除しました。

つまずいたポイント

Flow に値を流した際、該当の UseCase が起動しない

Flow に値を流した際、該当の UseCase がまだ収集されていなくて起動しないケースがありました。

その場合、viewLifecycleOwner.repeatOnLifecycleに指定する値を変更するか、MutableSharedFlowreplay = 1を設定して対処しました。

エラーハンドリング不要な UseCase にて処理失敗時、アプリがクラッシュしてしまう

エラーハンドリング不要な UseCase にて処理失敗時、アプリがクラッシュしてしまうケースがありました。

その場合、空のcatchを実装してエラーを握りつぶすようにしました。

flow {
    ...
}.catch {}

まとめ

本記事では、N予備校 Androidチームにてコード例を交えてこれまでの実装からどのように Kotlin Flow に移行したかの経緯、移行にあたってつまづいたポイントについてまとめました。

UseCase クラス数がかなり多いため、全ての UseCase クラスを対応するのに半年ほどかかりましたが、無事に Rx 系のライブラリを削除でき、他の開発の手を止めることなく進めることができました。

極力既存の処理への影響を少なくため、1 つずつ置き換えることがポイントです。

kotlinx-coroutines-rx 等の導入も検討しましたが、挙動が既存の動きと変わることがあったため導入しませんでした。

RxJava から Kotlin Flow に移行する際の助けになれば幸いです。

20
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
20
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?