Coroutines Flowとは
Kotlin Coroutines で実装された Cold ストリームです。
Rx を意識していますが、Coroutines の利点が生かされておりライフサイクルの管理がし易くなっています。
ズンドコキヨシとは
むかし流行ったやつ
ズンドコキヨシまとめ
検証バージョン
- Kotlin 1.4.10
- Coroutines 1.4.0
つくってみる
上流フローから順につくっていきます。
まず、ズン ドコの2つからランダムに1つ選択し一定間隔ごとに送出するFlowを定義します。
flow {
val source = listOf("ズン", "ドコ")
while (true) {
emit(source.shuffled().first())
delay(500)
}
}
次に、流れてきたデータが指定したパターンにマッチするか調べるためにバッファ化します。
今回はscanを使用し、リストに最大5個までデータを蓄積します。
また、scanの仕様として初期値(この例では空リスト)も次に流れてしまうため、dropWhileで空リストをスキップするようにします。
flow {
val source = listOf("ズン", "ドコ")
while (true) {
emit(source.shuffled().first())
delay(500)
}
}
.scan(emptyList<String>()) { list, s -> (list + s).takeLast(5) }
.dropWhile { it.isEmpty() }
次に、直近5個のバッファがズン・ズン・ズン・ズン・ドコの配列にマッチするかをチェックします。
まず最新のデータを送出し、パターンにマッチしていればワンテンポおいてからキ・ヨ・シ!を送出するFlowを用意し、flatMapConcatで繋げます。
flatMapには幾つかのバリエーションがありますが、出力される順序を重視するためflatMapConcatを使用しました。
val zunZunZunZunDoko = listOf("ズン", "ズン", "ズン", "ズン", "ドコ")
val KI_YO_SHI = "キ・ヨ・シ!"
// ...(省略)
.flatMapConcat { scanned ->
flow {
emit(scanned.last())
if (scanned == zunZunZunZunDoko) {
delay(500)
emit(KI_YO_SHI)
}
}
}
仕上げです。
onEachで流れてきたデータを出力します。
終了判定のためにtakeWhileを用い、キ・ヨ・シ!が流れてきたら終了します。
最後にcollectを呼ぶことで Cold ストリームが動作します。
// ...(省略)
.flatMapConcat { scanned ->
flow {
emit(scanned.last())
if (scanned == zunZunZunZunDoko) {
delay(500)
emit(KI_YO_SHI)
}
}
}
.onEach { println(it) }
.takeWhile { it != KI_YO_SHI }
.collect()
ズンドコキヨシ ver.1
@FlowPreview
@ExperimentalCoroutinesApi
suspend fun zunDokoKiyoshi() {
val zunZunZunZunDoko = listOf("ズン", "ズン", "ズン", "ズン", "ドコ")
val KI_YO_SHI = "キ・ヨ・シ!"
flow {
val source = listOf("ズン", "ドコ")
while (true) {
emit(source.shuffled().first())
delay(500)
}
}
.scan(emptyList<String>()) { list, s -> (list + s).takeLast(5) }
.dropWhile { it.isEmpty() }
.flatMapConcat { scanned ->
flow {
emit(scanned.last())
if (scanned == zunZunZunZunDoko) {
delay(500)
emit(KI_YO_SHI)
}
}
}
.onEach { println(it) }
.takeWhile { it != KI_YO_SHI }
.collect()
}
改良 & 一般化
ズンドコキヨシを一般化し、パラメータを次のように定義し、コードを改良していきます。
- 任意の型
Tのオブジェクトn個から成るランダムデータ元(source) - 任意の型
Tのオブジェクトm個から成る終了条件パターン(pattern) - 任意の型
Tの終端オブジェクト(terminal)
まず、リストを引数に取り、ランダムに1個を選んで送出し続けるFlowを返す関数を定義します。
送出間隔は生産側ではなく消費側で調節できるようにするためdelayを削除しています。
fun <T> randomFlow(source: List<T>) = flow<T> {
while (true) {
emit(source.shuffled().first())
}
}
次に、flatMapConcatの中身(キヨシチェックの部分)を関数として抽出します。
ここからもdelayを削除し、flowOfを使ってシンプル化しています。
パターンにマッチした場合は、terminalの後にnullを送出しています(理由は後述)。
fun <T> terminateIfSatisfiedFlow(
scanned: List<T>,
pattern: List<T>,
terminal: T
): Flow<T?> =
if (scanned == pattern) {
flowOf(scanned.last(), terminal, null)
} else {
flowOf(scanned.last())
}
最後に、ジェネリック・ズンドコキヨシの実装です。
takeWhileによる終了条件を!= terminalから!= nullに変更しています。
これは、terminal自身を終了条件にしてしまうと最終的なストリームにterminalが流れなくなるためです。
最後のfilterNotNull()はFlow<T?>をFlow<T>にアンラップするために用いています。
ジェネリック・ズンドコキヨシ(完成版)
@FlowPreview
@ExperimentalCoroutinesApi
fun <T> genericZDK(
source: List<T>,
pattern: List<T>,
terminal: T
): Flow<T> =
randomFlow(source)
.scan(emptyList<T>()) { list, e -> (list + e).takeLast(pattern.size) }
.dropWhile { it.isEmpty() }
.flatMapConcat { scanned -> terminateIfSatisfiedFlow(scanned, pattern, terminal) }
.takeWhile { it != null }
.filterNotNull()
テストします。
onEach内でdelayすることにより、送出テンポを調整できます。
@Test
fun getWild() = runBlocking {
genericZDK(
source = listOf("And", "Get", "Tough", "Wild"),
pattern = listOf("Get", "Wild", "And", "Tough"),
terminal = "退勤"
)
.onEach { delay(200) }
.collect { println(it) }
}
...
Get
And
And
And
And
Tough
Get
Get
And
Wild
Tough
Get
Wild
And
Tough
退勤
ジェネリクス化したことにより、equalsで比較可能な型であれば何でも適用することができます。
@Test
fun threeSeven() = runBlocking {
genericZDK<Any>(
source = (0..9).toList(),
pattern = listOf(7, 7, 7),
terminal = "You are lucky!"
)
.onEach { delay(50) }
.collectIndexed { index, value ->
print(value)
if (index % 10 == 9) {
println()
} else {
print(" ")
}
}
}
...
2 3 5 2 9 9 2 2 9 4
7 1 7 1 4 0 6 7 4 8
6 1 4 3 7 6 2 0 4 4
4 2 8 6 2 0 5 0 7 5
2 3 8 0 8 2 0 1 5 1
6 2 4 7 6 8 4 1 7 9
8 1 9 5 3 1 2 0 4 5
4 3 7 7 7 You are lucky!
おわりに
Coroutines Flow の優れた点はemitが suspend 関数になっていることで、今回のように無限にズンドコを送出するFlowを定義したとしても消費側でその速度を調節したり、「もういらないよ」と伝える(takeWhile)ことで処理をキャンセルすることができます。