前回の続きとなります
Coroutines Flowとは
Cold
データーストリームの一種
Cold?
RxJavaのObservableにもありますが、データストリームはCold
とHot
に大別できる
Cold
- 1つの消費者(生産者の対義語)に対して購読関係を結ぶ (=個別に値を流していく)
- Subscribeするまで値を流さない
- 通知するデータのタイムラインが購読されるたびに生成
- 大抵のObservableはCold
Hot
- 複数の消費者(生産者の対義語)に対して購読関係を結ぶ
- Subscribeしなくても値が流れていく
- すでに作成した通知するデータストリームに、後から消費者が加わる
- RxJavaでいうところのConnectableFlowable/ConnectableObserver
※上記2つの図はRxJavaリアクティブプログラミングから一部抜粋し、引用させていただきました
導入
def coroutines_ver = "1.3.3"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutines_ver"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$coroutines_ver"
CoroutinesFlowの使い方の流れ
- Flowでストリームを作成
- (任意)オペレータによる中間処理
- Flow.collectで値を受け取っていく
Flowの作り方
例としてここでは1から10までのストリームをあげてみます
flowOf
private fun getIntFlow1() = flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
asFlow()
Iterable, Iterator, Sequenceに対して使うことができます
private fun getIntFlow2() = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).asFlow()
flow {}
private fun getIntFlow3() = flow {
repeat(10) {
emit(it + 1)
}
}
オペレータによる中間処理
Rxにあったfilterやmapなど中間処理の関数をサポートしています
中間処理をする時点ではsuspendではないためこの点も便利ですね
val ints = getIntFlow1()
.filter { it > 3 }
.map { value ->
"Value: $value"
}
.drop(1)
.take(2)
上記では3以上のものをフィルタリングし、文字列に変換し、最初の1つを削除して、その中から最初の2つを選ぶということを行っています
Experimentalなものも未だ多い現状ですが、他にも様々な中間処理が使えますのでこちらのExtension Functionsを御覧ください
Flow<T>.collect
Flow<T>
及び途中の中間処理まではsuspend関数ではないのでどこでも定義ができます
一方、受け取り側のFlow.collectはsuspend関数のため、必ずコルーチンスコープ内で実行する必要があります
(逆にスコープの閉じ忘れがない事は利点になる)
val ints = getIntFlow1()
GlobalScope.launch(Dispatchers.Main) {
ints.collect {
delay(1000L)
text.text = it
}
}
※textはtextView
結果として1からはじまり、10まで1秒ごとにTextViewのtextに代入されViewに表示されていくことになります
Flow<T>.collectIndexed
collectと同様に値も取得できますし、indexも取得できます
さいごに
かんたんにCoroutines Flowについてとりあげました
次は今回は分量上避けていたFlowBuilderのうちの一つであるchannelFlow<T>
に着目して取り上げてみたいと思います