はじめに
最近は Kotlin の Flow を色々な場面で使うことが多くなりました。
Android のプログラミングだと、例えば Preferences DataStore から値を読む際に Flow が用いられていますし、Room を介してデータベースから読み出す際にも Flow を使えばデータベース更新時に新しい値が自動的に流れてきます。
データを時系列的に順々に処理する場合にも便利な仕組みだと言えましょう。
しかし色々と便利に使っている割には emit 関数で Flow に値を流し、collect 関数で流れてきた値を受け取るという程度でしか理解していませんでした。
また Flow のドキュメントを読んでいくと Flow は Cold (collect するまで送信側のコードは実行されない) だと書いてあったりします。
しかし一方では MutableSharedFlow や MutableStateFlow 等は Hot(collect に関わらず送信側のコードは Hot Flow に直接 emit できる) だったりして、 結局 Flow って Cold なのか Hot なのかと混乱してきます。
そこでここでは Flow のソースコードを元に超簡易版を自分で実装してみることで、どのような仕組みになっているのか調べてみようと思います。
また Cold Flow と Hot Flow の両方を実装し、その違いも見てみたいと思います。
Cold Flow の実装
まずは通常の Cold Flow を実装してみたいと思います。
簡単のために以下のような仕様とします。
- マルチスレッドは考慮しない
- コルーチンのキャンセルも考慮しない
- ジェネリクスはコードを読むのに煩わしいので Int型に限定する
以下に実装したコードを示します。
// fun interface means "functional (SAM) interface"
fun interface MyFlowCollector {
suspend fun emit(value: Int)
}
interface MyFlow {
suspend fun collect(collector: MyFlowCollector)
}
class MyColdFlow(private val block: suspend MyFlowCollector.() -> Unit) : MyFlow {
override suspend fun collect(collector: MyFlowCollector) {
collector.block()
}
}
とたったこれだけで出来てしまいます。
まず Flow を作成する時には、Flow の入力側で実行する関数を定義します。
val myFlowObject = MyColdFlow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
このラムダ関数は MyFlowCollector の拡張関数として定義されます。
分かりやすいようもっと明示的に書くと以下のようになります。
suspend fun MyFlowCollector.doAtFlowInput() {
for (i in 1..3) {
delay(100)
emit(i)
}
}
val myFlowObject2 = MyColdFlow(MyFlowCollector::doAtFlowInput)
もうちょっと Kotlin の Flow 風に書くなら
fun myFlow(block: suspend MyFlowCollector.() -> Unit): MyFlow = MyColdFlow(block)
の様な関数を用意して
val myFlowObject3 = myFlow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
の様にも出来ます。
そして MyFlowCollector オブジェクトを作成し、Flow の出力側の処理を定義します。
それを MyFlow オブジェクトの collect 関数の引数に渡します。
val collector = object : MyFlowCollector {
override suspend fun emit(value: Int) {
println("collected value = $value")
}
}
suspend fun testMyColdFlow() {
myFlowObject.collect(collector)
}
また、よく見る書き方のように SAM(Single Abstract Method) 変換を用いて
suspend fun testMyColdFlow2() {
myFlowObject.collect { value ->
println("collected value = $value")
}
}
と書いても良いでしょう。
collect 関数を実行すると MyColdFlow の collect 関数の実装に見るように、引数として渡された MyFlowCollector オブジェクトをレシーバーとして先程 MyColdFlow オブジェクト作成時に定義した関数(MyFlowCollector.doAtFlowInput)が呼ばれます。
この関数の中で emit 関数が呼ばれると MyFlowCollector オブジェクト作成時に定義した emit 関数が呼ばれ、この例なら println("collected value = $value")
が実行されることになります。
これを見ると "Flows are cold streams" と言われるのが良く分かると思います。
collect 関数を実行すると Flow 作成時に定義した拡張関数が実行され、そこから emit 関数が呼ばれてコレクターに値が渡されます。
デザインパターンの Observer パターンに慣れているなら emit → update、collect → addObserver(and run) の様に読みかえると腑に落ちるかもしれません。
Hot Flow の実装
MutableSharedFlow や MutableStateFlow の様な Flow は Hot Flow と呼ばれます。
ここでは MutableSharedFlow のソースコードを参考に超簡易版の Hot Flow を実装してみます。
簡単のために以下のような仕様とします。
- マルチスレッドは考慮しない
- collect 関数は無限ループになるためコルーチンの cancel 関数には対応し、停止するようにする
- ジェネリクスはコードを読むのに煩わしいので Int型に限定する
- SharedFlow は複数のコレクターを持てるが単一のコレクターのみをサポートする
- コレクターがいない場合には送ったデータは消えてしまう
- バッファー機能は無し
以下に実装したコードを示します。
class MyHotFlow : MyFlow, MyFlowCollector {
// emitで送られてきた値を保持する。
private var value: Int? = null
// コレクターがいるかどうかのフラグ
private var hasCollector: Boolean = false
// 値が送られてくるまでcollect関数がawaitValue関数でsuspendしている時、
// それを起こすためのContinuation
private var collectCont: Continuation<Unit>? = null
// 前に送った値がcollectされるまでemit関数がawaitCollect関数でsuspendしている時、
// それを起こすためのContinuation
private var emitCont: Continuation<Unit>? = null
// emit関数で送られた値を引数のcollectorオブジェクトに渡す。
// 一度呼ばれたら無限ループで戻らない。
// Coroutineのcancel関数で終了する。
override suspend fun collect(collector: MyFlowCollector) {
// コレクターが存在することを示すフラグを立てる
hasCollector = true
// 無限ループ
while (true) {
// valueに値が設定されたらそれを取得しvalueをnullで初期化し
// emit側がsuspendしていたら起こす。そしてcollectorオブジェクトに値を渡す。
// valueに値が設定されていなかったらawaitValueでsuspendする。
val newValue = value
value = null
if (newValue != null) {
emitCont?.resume(Unit)
emitCont = null
collector.emit(newValue as Int)
} else {
awaitValue()
}
}
}
// suspendCancellableCoroutineでsuspendする。
// 起こしてもらうためにcollectContにContinuationを渡す。
private suspend fun awaitValue() =
suspendCancellableCoroutine { c: Continuation<Unit> ->
collectCont = c
}
override suspend fun emit(value: Int) {
// コレクターがいなければ何もしない。送られてきた値は消える。
if (!hasCollector) return
// 前に送られてきた値がまだ取得されていなければsuspendする。
if (this.value != null) {
awaitCollect()
}
// 送られてきた値を保存し、collect側が値待ちでsuspendしていたら起こす。
this.value = value
collectCont?.resume(Unit)
collectCont = null
}
// suspendCancellableCoroutineでsuspendする。
// 起こしてもらうためにemitContにContinuationを渡す。
private suspend fun awaitCollect() =
suspendCancellableCoroutine { c: Continuation<Unit> ->
emitCont = c
}
}
使用例は以下のような感じになります。
suspend fun testMyHotFlow() {
val myHotFlow = MyHotFlow()
coroutineScope {
val j1 = launch {
myHotFlow.collect {
println("collected value from myHotFlow = $it")
}
}
val j2 = launch {
delay(100)
myHotFlow.emit(1)
delay(100)
myHotFlow.emit(2)
delay(100)
myHotFlow.emit(3)
}
delay(400)
j1.cancel()
j2.cancel()
}
}
MyHotFlow クラスは MyFlow インターフェイス以外に (emit 関数を持つ) MyFlowCollector インターフェイスも実装します。これにより外部から値を受け取る機能を持つようになります。
動作は以下のようになります。
- collect 関数は emit 関数が値を送ってくるまで suspend する
- emit 関数が値を value プロパティに保存する。その際、collect 関数が suspend していたらそれを起こす
- collect 関数は起こされたら value プロパティに保存された値を、引数である collector オブジェクトに collector.emit 関数で送る
- collect 関数は引数である collector オブジェクトに値を送った後、 value プロパティを null で初期化し、送信が終了したことを示す
- collect 関数は引数である collector オブジェクトに値を送った後、 emit 関数が suspend していたらそれを起こす
- collect 関数が呼ばれる前に emit 関数が呼ばれた場合は何もしない。送ってきた値は捨てられる
- emit 関数で値を送る際にまだ前の値が残っていたら (value プロパティが null でなかったら) suspend する
ColdFlow とは異なりだいぶ実装が複雑になり、まったく別物になっていることが分かります。
まとめ
以上、超簡易版の Cold Flow と Hot Flow を実装してみました。このように自分で必要最低限の実装をしてみると Flow がどのように動いているのかがよく分かります。
また Flow インターフェイス自体は Cold や Hot には関係なく、その実装で Cold か Hot であるかが決まることが分かりました。
なにか新しいものが出てきて何となくは使えるけれどいまいち良く分かっていない等の場合は、自分で実装してみるというのは理解するためになかなか良い手段だと感じました。(Flow はもう枯れた技術かもしれませんが ^^;)