背景
- StateFlowのonEach処理が意図せず停止することがある
原因
- onEach(collect)内でSharedFlowへのemit時にsuspendしていた
onEach(collect)が停止するパターン
// MutableSharedFlowはデフォルトではバッファリングできないためコンシューマが存在しない場合emit()時にsuspendする
private val _events = MutableSharedFlow<Events>()
val events = _events.asSharedFlow()
var timeOutJob: Job? = null
batteryReceiver.batteryReceiver.onEach {
when (it) {
Intent.ACTION_POWER_CONNECTED -> {
timeOutJob?.cancel()
// ここでeventsのconsumerがいないタイミングでsuspendしていたのが原因
_events.emit(Events.ChargeConnected)
}
Intent.ACTION_POWER_DISCONNECTED -> {
// 遅延実行
timeOutJob = defaultScope.launch {
delay(5000)
_events.emit(Events.ChargeDisconnected)
}
}
}
}.launchIn(defaultScope)
onEach(collect)が停止しないパターン
// replay=1を指定してバッファリング(consumerの収集開始時に過去1件の値を再送信する)
// onBufferOverflow=BufferOverflow.DROP_OLDESTを指定し、最も古いアイテムをドロップして、新しいアイテムをバッファに追加する
private val _events = MutableSharedFlow<Events>(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val events = _events.asSharedFlow()
var timeOutJob: Job? = null
batteryReceiver.batteryReceiver.onEach {
when (it) {
Intent.ACTION_POWER_CONNECTED -> {
timeOutJob?.cancel()
// eventsのconsumerがいない場合でもsuspendしなくなる
_events.emit(Events.ChargeConnected)
}
Intent.ACTION_POWER_DISCONNECTED -> {
// 遅延実行
timeOutJob = defaultScope.launch {
delay(5000)
_events.emit(Events.ChargeDisconnected)
}
}
}
}.launchIn(defaultScope)