Coroutines Flowとは
Kotlin Coroutines で実装された Cold ストリームです。
Rx を意識していますが、Coroutines の利点が生かされておりライフサイクルの管理がし易くなっています。
ズンドコキヨシとは
むかし流行ったやつ
ズンドコキヨシまとめ
検証バージョン
- Kotlin 1.4.10
- Coroutines 1.4.0
つくってみる
上流フローから順につくっていきます。
まず、ズン
ドコ
の2つからランダムに1つ選択し一定間隔ごとに送出するFlow
を定義します。
flow {
val source = listOf("ズン", "ドコ")
while (true) {
emit(source.shuffled().first())
delay(500)
}
}
次に、流れてきたデータが指定したパターンにマッチするか調べるためにバッファ化します。
今回はscan
を使用し、リストに最大5個までデータを蓄積します。
また、scan
の仕様として初期値(この例では空リスト)も次に流れてしまうため、dropWhile
で空リストをスキップするようにします。
flow {
val source = listOf("ズン", "ドコ")
while (true) {
emit(source.shuffled().first())
delay(500)
}
}
.scan(emptyList<String>()) { list, s -> (list + s).takeLast(5) }
.dropWhile { it.isEmpty() }
次に、直近5個のバッファがズン・ズン・ズン・ズン・ドコ
の配列にマッチするかをチェックします。
まず最新のデータを送出し、パターンにマッチしていればワンテンポおいてからキ・ヨ・シ!
を送出するFlow
を用意し、flatMapConcat
で繋げます。
flatMap
には幾つかのバリエーションがありますが、出力される順序を重視するためflatMapConcat
を使用しました。
val zunZunZunZunDoko = listOf("ズン", "ズン", "ズン", "ズン", "ドコ")
val KI_YO_SHI = "キ・ヨ・シ!"
// ...(省略)
.flatMapConcat { scanned ->
flow {
emit(scanned.last())
if (scanned == zunZunZunZunDoko) {
delay(500)
emit(KI_YO_SHI)
}
}
}
仕上げです。
onEach
で流れてきたデータを出力します。
終了判定のためにtakeWhile
を用い、キ・ヨ・シ!
が流れてきたら終了します。
最後にcollect
を呼ぶことで Cold ストリームが動作します。
// ...(省略)
.flatMapConcat { scanned ->
flow {
emit(scanned.last())
if (scanned == zunZunZunZunDoko) {
delay(500)
emit(KI_YO_SHI)
}
}
}
.onEach { println(it) }
.takeWhile { it != KI_YO_SHI }
.collect()
ズンドコキヨシ ver.1
@FlowPreview
@ExperimentalCoroutinesApi
suspend fun zunDokoKiyoshi() {
val zunZunZunZunDoko = listOf("ズン", "ズン", "ズン", "ズン", "ドコ")
val KI_YO_SHI = "キ・ヨ・シ!"
flow {
val source = listOf("ズン", "ドコ")
while (true) {
emit(source.shuffled().first())
delay(500)
}
}
.scan(emptyList<String>()) { list, s -> (list + s).takeLast(5) }
.dropWhile { it.isEmpty() }
.flatMapConcat { scanned ->
flow {
emit(scanned.last())
if (scanned == zunZunZunZunDoko) {
delay(500)
emit(KI_YO_SHI)
}
}
}
.onEach { println(it) }
.takeWhile { it != KI_YO_SHI }
.collect()
}
改良 & 一般化
ズンドコキヨシを一般化し、パラメータを次のように定義し、コードを改良していきます。
- 任意の型
T
のオブジェクトn
個から成るランダムデータ元(source
) - 任意の型
T
のオブジェクトm
個から成る終了条件パターン(pattern
) - 任意の型
T
の終端オブジェクト(terminal
)
まず、リストを引数に取り、ランダムに1個を選んで送出し続けるFlow
を返す関数を定義します。
送出間隔は生産側ではなく消費側で調節できるようにするためdelay
を削除しています。
fun <T> randomFlow(source: List<T>) = flow<T> {
while (true) {
emit(source.shuffled().first())
}
}
次に、flatMapConcat
の中身(キヨシチェックの部分)を関数として抽出します。
ここからもdelay
を削除し、flowOf
を使ってシンプル化しています。
パターンにマッチした場合は、terminal
の後にnull
を送出しています(理由は後述)。
fun <T> terminateIfSatisfiedFlow(
scanned: List<T>,
pattern: List<T>,
terminal: T
): Flow<T?> =
if (scanned == pattern) {
flowOf(scanned.last(), terminal, null)
} else {
flowOf(scanned.last())
}
最後に、ジェネリック・ズンドコキヨシの実装です。
takeWhile
による終了条件を!= terminal
から!= null
に変更しています。
これは、terminal
自身を終了条件にしてしまうと最終的なストリームにterminal
が流れなくなるためです。
最後のfilterNotNull()
はFlow<T?>
をFlow<T>
にアンラップするために用いています。
ジェネリック・ズンドコキヨシ(完成版)
@FlowPreview
@ExperimentalCoroutinesApi
fun <T> genericZDK(
source: List<T>,
pattern: List<T>,
terminal: T
): Flow<T> =
randomFlow(source)
.scan(emptyList<T>()) { list, e -> (list + e).takeLast(pattern.size) }
.dropWhile { it.isEmpty() }
.flatMapConcat { scanned -> terminateIfSatisfiedFlow(scanned, pattern, terminal) }
.takeWhile { it != null }
.filterNotNull()
テストします。
onEach
内でdelay
することにより、送出テンポを調整できます。
@Test
fun getWild() = runBlocking {
genericZDK(
source = listOf("And", "Get", "Tough", "Wild"),
pattern = listOf("Get", "Wild", "And", "Tough"),
terminal = "退勤"
)
.onEach { delay(200) }
.collect { println(it) }
}
...
Get
And
And
And
And
Tough
Get
Get
And
Wild
Tough
Get
Wild
And
Tough
退勤
ジェネリクス化したことにより、equals
で比較可能な型であれば何でも適用することができます。
@Test
fun threeSeven() = runBlocking {
genericZDK<Any>(
source = (0..9).toList(),
pattern = listOf(7, 7, 7),
terminal = "You are lucky!"
)
.onEach { delay(50) }
.collectIndexed { index, value ->
print(value)
if (index % 10 == 9) {
println()
} else {
print(" ")
}
}
}
...
2 3 5 2 9 9 2 2 9 4
7 1 7 1 4 0 6 7 4 8
6 1 4 3 7 6 2 0 4 4
4 2 8 6 2 0 5 0 7 5
2 3 8 0 8 2 0 1 5 1
6 2 4 7 6 8 4 1 7 9
8 1 9 5 3 1 2 0 4 5
4 3 7 7 7 You are lucky!
おわりに
Coroutines Flow の優れた点はemit
が suspend 関数になっていることで、今回のように無限にズンドコ
を送出するFlow
を定義したとしても消費側でその速度を調節したり、「もういらないよ」と伝える(takeWhile
)ことで処理をキャンセルすることができます。