はじめに
今回は coroutines の Flow の API である callbackFlow が便利という
噂を聞いたので callbackFlow について調べて動かしてみたいと思います。
// 動作確認には Kotlin Coroutines 1.4.2 を利用しています。
dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2"
testCompile group: 'junit', name: 'junit', version: '4.12'
}
callbackFlow を使ってみる
callbackFlow
はコールバックベースの API を Flow に変換するためのビルダーらしく、次のように callbackFlow
は定義されており block : suspend ProducerScope<T>.() -> Unit
でコールバックベースの API を Flow にする繋ぎこむを実行するようになっています。
@ExperimentalCoroutinesApi fun <T> callbackFlow(
block: suspend ProducerScope<T>.() -> Unit
): Flow<T> (source)
例えばですがコールバックベースの API でカウンタ値の変更を通知する Counter
クラスがあったとします。
interface OnChangedListener {
fun onChanged(count: Int)
}
class Counter {
private val listeners: MutableList<OnChangedListener> = mutableListOf()
var count = 0
private set(value) {
field = value
notifyOnChanged(value)
}
fun increment() {
count++
}
fun decrement() {
count--
}
fun addOnChangedListener(listener: OnChangedListener) {
println("addOnChangedListener")
listeners.add(listener)
}
fun removeOnChangedListener(listener: OnChangedListener){
println("removeOnChangedListener")
listeners.remove(listener)
}
private fun notifyOnChanged(count: Int) {
listeners.forEach { listener -> listener.onChanged(count) }
}
}
callbackFlow
を使えばこのようなコールバックベースの API である Counter
クラスを Flow に変換できるみたいです。
/**
* callbackFlow を生成する
*/
private fun createCallbackFlow(counter: Counter): Flow<Int> = callbackFlow {
// リスナー生成に失敗した場合は close を呼び出して Flow を終了させる
val hasError = false
if (hasError) {
close()
}
// callbackFlow で作成した Flow にデータ更新を通知するリスナーを作成する
val listener = object : OnChangedListener {
override fun onChanged(count: Int) {
// callbackFlow から ProducerScope が渡されるようになっていて send や offer でデータを送信すると、
// callbackFlow から生成した Flow の collect でデータを受信できるようになっているみたいです。
// callbackFlow では内部的に SendChannel を利用して cold flow を生成しているようなのですが、
// callbackFlow で渡されるのは SendChannel を継承した ProducerScope が渡されるようになっています。
offer(count)
}
}
// callbackFlow で作成した Flow にデータ更新を通知するリスナーを登録する
counter.addOnChangedListener(listener)
// callbackFlow で作成した Flow がキャンセルされるまで待ち、キャンセルされたらリスナーを解除する
awaitClose {
counter.removeOnChangedListener(listener)
}
}
callbackFlow での Flow 変換処理が完成したら通常の Flow と同じように collect を呼び出せば更新したデータを受信します。
val counter = Counter()
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
/**
* callbackFlow が起動された時の動作
*/
private fun callbackFlowSample() = runBlocking {
// 特定の scope で callbackFlow を起動してデータ購読を開始する
scope.launch {
val flow: Flow<Int> = createCallbackFlow(counter)
flow.collect { println("count:$it") }
}
delay(100)
repeat(10) { counter.increment() }
repeat(10) { counter.decrement() }
delay(100)
// callbackFlow を起動した scope をキャンセルしてデータ購読を終了する
scope.cancel()
}
すると Flow で更新されたデータが受信できるようになります。
しっかり Coroutines がキャンセルされた時点で removeOnChangedListener が呼び出されています。
addOnChangedListener
count:1
count:2
count:3
count:4
count:5
count:6
count:7
count:8
count:9
count:10
count:9
count:8
count:7
count:6
count:5
count:4
count:3
count:2
count:1
count:0
removeOnChangedListener
おわりに
今回は callbackFlow を使ってコールバックベースの API を Flow に変換してみました。callbackFlow の強みは CoroutineScope のライフサイクルに応じてデータ購読を解除できる点かなと思います。Android アプリ開発などのライフサイクルの管理が難しいシチュエーションではかなり力を発揮するのではないかと思いました。