念のため前書き
この記事は公式のドキュメントと
Mori Atsushiさんの素晴らしい記事を
自分向けに雑にまとめた物です。
Flowの超基本的なところ
Flow(Flow,SharedFlow,StateFlowを纏めて呼んでる)には
コールドストリームとホットストリームに分けられる。
普通のFlowはコールドストリーム。
SharedFlow, StateFlowはホットストリーム。
[コールドストリームの分かりやすい説明]
(https://qiita.com/tonionagauzzi/items/12aa1a4400256cece72c#cold%E3%81%95%E3%82%93%E3%81%AE%E8%89%AF%E3%81%84%E3%81%A8%E3%81%93%E3%82%8D%E3%81%A8%E6%AC%A0%E7%82%B9)
[ホットストリームの分かりやすい説明]
(https://qiita.com/tonionagauzzi/items/12aa1a4400256cece72c#hot%E3%81%95%E3%82%93%E3%81%AE%E8%89%AF%E3%81%84%E3%81%A8%E3%81%93%E3%82%8D%E3%81%A8%E6%AC%A0%E7%82%B9)
Flowのメリット
FlowはView、ViewModel、Repositoryで一貫してFlowを受け渡すことで一貫性が生まれる。
あとは、必要な箇所でのみ動かすことが出来る(launchWhenXXX系)。
普通のFlowの使い方
基本の使い方
出力にはemit()を使用する
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
Repositoryで値を変換することも可能
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
リッスンの開始にはcollectを使用する。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
収集中のコルーチンをキャンセルした場合にデータストリームが終了する。
例外のキャッチにはcatchを使用する
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
flowOnでCoroutineContextを切り替える
.flowOn(defaultDispatcher)
callbackFlowを使用してコールバックベースのAPIをFlowに変換する
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
offer(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
send()を利用して別のCoroutineContextから or offer()を利用してCoroutineの外部から値を出力できる。
普通のFlowの注意点
このようなコードがあった場合
val flow = flow {
println("emit!")
emit(1)
}
flow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
flow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit!
emit!
onEach1:1
onEach2:1
emit()が起動した回数分呼ばれる。
そのため、以下のように重たい処理をしたい場合には不向き。
val flow = flow {
// 重たい処理(API call等)
}.map {
// 重たい処理(API call等)
}
// 重たい処理が毎回呼ばれる
flow.launchIn(GlobalScope)
また、コールドFlowのためflowに対して直接emitすることは出来ない。
val flow = flowOf(1)
flow.emit(2) // compile error!!
普通のFlowの問題点を解決するために出てきたのがSharedFlow
SharedFlowの作り方にはMutableSharedFlowとshareIn()がある。
SharedFlowはemit()が一度しか呼ばれないため、
複数箇所で起動することに適している。
だから「Shared」?
MutableSharedFlowの使い方
val mutableSharedFlow = MutableSharedFlow<Int>()
mutableSharedFlow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
mutableSharedFlow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking {
mutableSharedFlow.emit(1)
mutableSharedFlow.emit(2)
delay(100) // 処理が終わるまでちょっと待つ
}
ホットFlowのため、外部からemit()することが可能。
公式では以下のようなサンプルコードを載せている
// Class that centralizes when the content of the app needs to be refreshed
class TickHandler(
private val externalScope: CoroutineScope,
private val tickIntervalMs: Long = 5000
) {
// Backing property to avoid flow emissions from other classes
private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
val tickFlow: SharedFlow<Event<String>> = _tickFlow
init {
externalScope.launch {
while(true) {
_tickFlow.emit(Unit)
delay(tickIntervalMs)
}
}
}
}
class NewsRepository(
...,
private val tickHandler: TickHandler,
private val externalScope: CoroutineScope
) {
init {
externalScope.launch {
// Listen for tick updates
tickHandler.tickFlow.collect {
refreshLatestNews()
}
}
}
suspend fun refreshLatestNews() { ... }
...
}
replay を使用すると、以前に出力された複数の値を新しいサブスクライバに再送信できる。
onBufferOverflow を使用すると、バッファが送信アイテムでいっぱいになったときのポリシーを指定できる。
デフォルト値は BufferOverflow.SUSPEND で、呼び出し元を停止します。他のオプションには、DROP_LATEST と DROP_OLDEST がある。
shareInを使用してコールドFlowをホットFlow化する
shareInは通常のFlowをSharedFlowに変換するoperator。
shareInを使用する場合、次の情報を渡す必要がある。
- Flow の共有に使用する CoroutineScope。このスコープは、共有 Flow を必要な期間存続させるために、どのコンシューマよりも長く存続させる必要がある。
- 新しいコレクタそれぞれに対してリプレイするアイテムの数。
- 開始動作ポリシー。
shareInを使用することで、コールドFlowとは違い重たい処理を行いやすくなる。
val flow = flow {
println("emit!")
emit(1)
}
val sharedFlow = flow.shareIn(GlobalScope, SharingStarted.Eagerly)
sharedFlow.onEach {
println("onEach1:$it")
}.launchIn(GlobalScope)
sharedFlow.onEach {
println("onEach2:$it")
}.launchIn(GlobalScope)
runBlocking { delay(100) } // 処理が終わるまでちょっと待つ
emit! // emitが一度しか呼ばれていない
onEach2:1
onEach1:1
開始動作ポリシーには以下の3つがある。
- SharingStarted.WhileSubscribed() 開始ポリシーは、アクティブなサブスクライバが存在する間は、
アップストリーム プロデューサをアクティブに保つ - SharingStarted.Eagerlyは他の開始ポリシーとして、プロデューサをすぐに開始する
- SharingStarted.Lazilyは最初のサブスクライバが現れたときに共有を開始し、Flow を永続的にアクティブに保つ
StateFlow
StateFlowもホットストリームです。
状態保持を行うための特別なSharedFlowのイメージ。
MutableStateFlow、stateIn()を使用して作成可能。
SharedFlowと違い、
- 初期値が必須
- launchInしたタイミングで直近の値が1件流れてくる
- 値の設定はvalueで行え、coroutines scopeは必要ない
- 同じ値は流れない
- 連続で値が変更されると最後の値のみ流れてくる
基本の使い方
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
// Backing property to avoid state updates from other classes
private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
// The UI collects from this StateFlow to get its state updates
val uiState: StateFlow<LatestNewsUiState> = _uiState
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Update View with the latest favorite news
// Writes to the value property of MutableStateFlow,
// adding a new element to the flow and updating all
// of its collectors
.collect { favoriteNews ->
_uiState.value = LatestNewsUiState.Success(favoriteNews)
}
}
}
}
// Represents different states for the LatestNews screen
sealed class LatestNewsUiState {
data class Success(news: List<ArticleHeadline>): LatestNewsUiState()
data class Error(exception: Throwable): LatestNewsUiState()
}
ViewからStateFlowを参照する場合
class LatestNewsActivity : AppCompatActivity() {
private val latestNewsViewModel = // getViewModel()
override fun onCreate(savedInstanceState: Bundle?) {
...
// This coroutine will run the given block when the lifecycle
// is at least in the Started state and will suspend when
// the view moves to the Stopped state
lifecycleScope.launchWhenStarted {
// Triggers the flow and starts listening for values
latestNewsViewModel.uiState.collect { uiState ->
// New value received
when (uiState) {
is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
is LatestNewsUiState.Error -> showError(uiState.exception)
}
}
}
}
}
普通のFlowをリッスンする場合と変わらない。
launchWhenって何
ライフサイクルの状態によって動作を開始・終了するcoroutine scope。
launchWhenStartedの場合、クラスが起動した場合に実行される。
詳しくはここ
FlowをStateFlowに変換するにはstateInを使用する
StateFlowの動作を停止させるには
launchWhenStarted を使用して Flow を収集する上記の例では、View がバックグラウンドに移動して Flow 収集をトリガーするコルーチンが停止しても、下層のプロデューサはアクティブなまま。
手動で停止させるには
class LatestNewsActivity : AppCompatActivity() {
...
// Coroutine listening for UI states
private var uiStateJob: Job? = null
override fun onStart() {
super.onStart()
// Start collecting when the View is visible
uiStateJob = lifecycleScope.launch {
latestNewsViewModel.uiState.collect { uiState -> ... }
}
}
override fun onStop() {
// Stop collecting when the View goes to the background
uiStateJob?.cancel()
super.onStop()
}
}
Viewが表示されていないときに停止させるにはasLiveData()でLiveDataに変更するという手もある
class LatestNewsActivity : AppCompatActivity() {
...
override fun onCreate(savedInstanceState: Bundle?) {
...
latestNewsViewModel.uiState.asLiveData().observe(owner = this) { state ->
// Handle UI state
}
}
}