Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
0
Help us understand the problem. What is going on with this article?
@kaoru

Kotlin CoroutineのFlowで一定期間データが取得できない場合のタイムアウト処理を書くには

背景

例えば株価をリアルタイムに画面に表示することを考えてみます。
ネットワークの調子が悪いときに現在の価格を消して「取得できません」と出したいようなときどう書くのがシンプルか考えてみました。

方法

suspend funの場合のタイムアウト

まず普通のsuspend funであれば簡単です。
withTimeoutやwithTimeoutOrNullで囲んでやればタイムアウト処理を書けます。

    withTimeout(1000L) {
        delay(2000)
    }

Flowの場合のタイムアウト

ではFlowの場合はどうすればよいでしょうか?

ぱっと見てぴったりな標準関数は見当たりませんが、select { } を使えば簡単に実現できます。

以下説明用のため汎用性はありませんがFlow版のwithTimeoutを作ってみました。

fun Flow<String>.withTimeout(coroutineScope: CoroutineScope, timeMills: Long) = flow {
    val receiveChannel = produceIn(coroutineScope)

    while (coroutineScope.isActive) {
        select<Unit> {
            onTimeout(timeMills) {
                emit("タイムアウトしました")
            }

            receiveChannel.onReceive {
                emit(it)
            }
        }
    }
}

サンプルコードと実行結果

全体のコードとしては以下のようになります。

package example

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.withTimeout
import kotlin.coroutines.EmptyCoroutineContext

suspend fun main() {
    val job = SupervisorJob()
    val coroutineScope = CoroutineScope(EmptyCoroutineContext + job)

    coroutineScope.launch {
        val exampleFlow: Flow<String> = flow {
            delay(5_000)
            emit("1")
            delay(1_000)
            emit("2")
            delay(5_000)
            emit("3")
            emit("END")
        }

        exampleFlow
            .withTimeout(coroutineScope, 2_000)
            .distinctUntilChanged()
            .collect {
                println(it)

                if (it == "END") {
                    coroutineScope.cancel()
                }
            }
    }.join()
}

fun Flow<String>.withTimeout(coroutineScope: CoroutineScope, timeMills: Long) = flow {
    val receiveChannel = produceIn(coroutineScope)

    while (coroutineScope.isActive) {
        select<Unit> {
            onTimeout(timeMills) {
                emit("タイムアウトしました")
            }

            receiveChannel.onReceive {
                emit(it)
            }
        }
    }
}

suspend fun aa() {
    withTimeout(1000L) {
        delay(2000)
    }
}

出力のサンプルは以下です。

タイムアウトしました
1
2
タイムアウトしました
3
END
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
0
Help us understand the problem. What is going on with this article?