LiveData: 2.2.0-alpha1のliveData {}によるコルーチン対応についてドキュメントやGoogle I/Oの動画とコードを読む

Google I/Oで発表されており、割とKotlin Coroutinesを使ったアプリの作り方に影響するよう内容だったので、調べてみました。

利用例として、Kotlin CoroutiensでAPIやDBを操作して、LiveDataで返したいというのはかなりよくあるパターンです。

これを利用すると単純になるだけでなく、よくあるうまく動かないパターンなどを解決してくれるみたいです。

ここにソースコードがあります。

https://android.googlesource.com/platform/frameworks/support/+/7a4525ac3c56adff5e0de713ca54eae89518e28c/lifecycle/livedata/ktx/src/main/java/androidx/lifecycle/CoroutineLiveData.kt

まずはコードを見るとわかりやすいと思います。 liveData{} のようにブロックを渡しており、そのブロックで emit() を呼ぶと値がLiveDataに入ります。

val data : LiveData<Int> = liveData {

delay(3000)
emit(3)
}


ドキュメント(KDoc)とソースコードを読む

ドキュメントと、それに対するコードを読んでいきましょう。


LiveDataの状態と引数のブロックの関係について


ドキュメント

LiveDataがactiveになったときに引数のブロックが実行されます。(つまり例えばliveData.observe(activity){}だとActivityがonStart以降のときに実行される感じです。)



  • The [block] starts executing when the returned [LiveData] becomes active ([LiveData.onActive]).



コード

まずはliveData{}の呼び出しを見に行きましょう。以下のようになっており、基本的にCoroutineLiveData()でインスタンス化しているだけです。

fun <T> liveData(

context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
// **** ここに引数のブロックがいます ****
@BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)
// **** CoroutineLiveData()をインスタンス化 ****

CoroutineLiveDataの実装を見ていきましょう。init()でBlockRunnerをインスタンス化し、onActive()でblockを実行しようとします。(BlockRunnerの実装は後ほど)

internal class CoroutineLiveData<T>(

context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
// **** ここに引数のブロックがいます ****
block: Block<T>
) : MediatorLiveData<T>() {
private var blockRunner: BlockRunner<T>?
...

init {
val supervisorJob = SupervisorJob(context[Job])

blockRunner = BlockRunner(
liveData = this,
// **** ここに引数のブロックがいます ****
block = block,
timeoutInMs = timeoutInMs,
scope = scope
) {
blockRunner = null
}
}
...

override fun onActive() {
super.onActive()
// **** ここで実行される!! ****
blockRunner?.maybeRun()
}
...


LiveDataがinactiveになったときにどうなるか?


ドキュメント

ブロックの実行中にLiveDataがinactiveになった場合(つまりonStop()になった場合)、 再度activeにならずにtimeoutInMsで設定されている時間(デフォルト5秒)が経ったらブロックがキャンセルされます。これはActivityの画面回転でキャンセルさせないためです。(APIやDBのアクセス中に無駄にキャンセルされたり、例えばカウンターだったとして画面回転で初期化されて、0から戻るとつらいですよね?)

キャンセルされているブロックからのemitは無視されます。

Google I/Oの動画よりカウンターの例

image.png



  • If the [LiveData] becomes inactive ([LiveData.onInactive]) while the [block] is executing, it

  • will be cancelled after [timeoutInMs] milliseconds unless the [LiveData] becomes active again

  • before that timeout (to gracefully handle cases like Activity rotation). Any value

  • [LiveDataScope.emit]ed from a cancelled [block] will be ignored.



コード

コードは以下のようになっています。onInactive() ではblockRunner?.cancel()を呼んでいますね。BlockRunnerのコードも読んでみましょう。

internal class CoroutineLiveData<T>(

context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
// **** ここに引数のブロックがいます ****
block: Block<T>
) : MediatorLiveData<T>() {
private var blockRunner: BlockRunner<T>?
...

override fun onActive() {
super.onActive()
blockRunner?.maybeRun()
}

override fun onInactive() {
super.onInactive()
// **** blockRunnerでキャンセルもしていそう? ****
blockRunner?.cancel()
}
}

BlockRunnerでは実際に5秒間待つ処理をみることができます。以下のようになっており、実際に5秒間待ちます。

internal class BlockRunner<T>(

private val liveData: CoroutineLiveData<T>,
private val block: Block<T>,
private val timeoutInMs: Long,
private val scope: CoroutineScope,
private val onDone: () -> Unit
) {
...
@MainThread
fun cancel() {
if (cancellationJob != null) {
error("Cancel call cannot happen without a maybeRun")
}
cancellationJob = scope.launch(Dispatchers.Main) {
// **** ここで5秒間待つ ****
delay(timeoutInMs)
// **** もしActiveなObserverがいなければキャンセルを呼び出す ****
if (!liveData.hasActiveObservers()) {
// one last check on active observers to avoid any race condition between starting
// a running coroutine and cancelation
runningJob?.cancel()
runningJob = null
}
}
}
}


LiveDataのinactiveによってキャンセルされた後、LiveDataが再度Activeになったときはどうなるか?


ドキュメント

LiveDataのinactiveによってキャンセルされた後、LiveDataが再度Activeになったときはブロックは最初から再実行されます。

もしストップしたところから再実行したい場合 LiveDataScope.initialValue 関数を使うことが出来、それにより最後にemitされた値が取得できます。(LiveDataScopeとはliveData{}のラムダの中のthisがLiveDataScopeになっています。)



  • After a cancellation, if the [LiveData] becomes active again, the [block] will be re-executed

  • from the beginning. If you would like to continue the operations based on where it was stopped

  • last, you can use the [LiveDataScope.initialValue] function to get the last

  • [LiveDataScope.emit]ed value.


ブロックが成功したか、LiveDataがinactiveになる以外の理由でキャンセルになったら(Exceptionがthrowされるとそうなります)、activeになっても、もう一度ブロックが実行されることはありません。



  • If the [block] completes successfully or is cancelled due to reasons other than [LiveData]

  • becoming inactive, it will not be re-executed even after [LiveData] goes through active

  • inactive cycle.



コード

CoroutineLiveDataのinit処理を思い出しましょう。ブロックが実行が終わったらBlockRunnerのコンストラクタ最後の引数のラムダが呼び出され、 blockRunnerをnullにするため、 onActive() になったときも安全呼び出しによって再度実行される事はありませんが、途中でキャンセルされた場合は、nullにならないため、再度実行されます。

internal class CoroutineLiveData<T>(

context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
block: Block<T>
) : MediatorLiveData<T>() {
private var blockRunner: BlockRunner<T>?
...

init {
val supervisorJob = SupervisorJob(context[Job])

val scope = CoroutineScope(Dispatchers.Main + context + supervisorJob)
blockRunner = BlockRunner(
liveData = this,
block = block,
timeoutInMs = timeoutInMs,
scope = scope
) {
// ****このラムダは引数のブロックが終わったときに実行される!!****
blockRunner = null
}
}
...

override fun onActive() {
super.onActive()
// 安全呼び出し
blockRunner?.maybeRun()
}

一応、BlockRunnerのmaybeRunの実装も載せておきます。ここからブロックが終わったときにnullが入る流れがわかると思います。

internal class BlockRunner<T>(

private val liveData: CoroutineLiveData<T>,
// これがliveData{}に渡した引数のブロック
private val block: Block<T>,
private val timeoutInMs: Long,
private val scope: CoroutineScope,
// これがブロックが終わったときを知らせるもの
private val onDone: () -> Unit
) {
// currently running block job.
private var runningJob: Job? = null

// cancelation job created in cancel.
private var cancellationJob: Job? = null

@MainThread
fun maybeRun() {
cancellationJob?.cancel()
cancellationJob = null
if (runningJob != null) {
return
}
runningJob = scope.launch {
val liveDataScope = LiveDataScopeImpl(liveData, coroutineContext)
// ****ブロックの実行****
block(liveDataScope)
// ****ブロックの実行完了したらonDoneのコールバックを呼び出す!****
onDone()
}
}

ここはどういうことなのか見に行きましょう

もしストップしたところから再実行したい場合 `LiveDataScope.initialValue` 関数を使うことが出来、それにより最後にemitされた値が取得できます。(LiveDataScopeとは`liveData{}`のラムダの中のthisがLiveDataScopeになっています。)

一瞬勘違いしちゃうのですが、LiveDataScopeはCoroutinesのスコープとは何も関係なく、ただのインターフェースになっており、

それを実装したLiveDataScopeImplはブロックが実行される直前にインスタンス化されます。

以下はブロックの実行部分です。

        runningJob = scope.launch {

// **LiveDataScopeImplのインスタンス化**
val liveDataScope = LiveDataScopeImpl(liveData, coroutineContext)
// ****ブロックの実行****
block(liveDataScope)
onDone()
}

コンストラクタのデフォルト引数として、LiveData値を取得してinitialValueに入れることで、LiveDataScopeImplのinitialValueが代入されます。

internal class LiveDataScopeImpl<T>(

internal var target: CoroutineLiveData<T>,
context: CoroutineContext,
override val initialValue: T? = target.value
) : LiveDataScope<T> {


cancellationと連携するようにしよう

ベストプラクティスとして、ブロック内をcancellationと連携するようにしよう。

(おそらく、suspend functionを呼び出すコードにして、ちゃんとキャンセルされたときに終了するようにしてあげる必要があるということです。呼び出し先をsuspend functionを使っていれば問題ありません。)




ドキュメント内のサンプルを見てみる


他のLiveDataの変更を見つつ、30秒ごとに更新する例

observeされている間、userId に基づいて User オブジェクトを取得し、30秒ごとに更新する例です。

while(true) で大丈夫な理由は LiveDataが観測されなくなったら、 delay() がキャンセルと連携してくれるためです。

 * // a LiveData that fetches a `User` object based on a `userId` and refreshes it every 30 seconds

* // as long as it is observed
* val userId : LiveData<String> = ...
* val user = userId.switchMap { id ->
* liveData {
* while(true) {
* // note that `while(true)` is fine because the `delay(30_000)` below will cooperate in
* // cancellation if LiveData is not actively observed anymore
* val data = api.fetch(id) // errors are ignored for brevity
* emit(data)
* delay(30_000)
* }
* }
* }


APIの失敗でバックオフしつつリトライする例

待ち時間を2倍の時間にしていくバックオフで、データ取得をリトライする例

 * // A retrying data fetcher with doubling back-off

* val user = liveData {
* var backOffTime = 1_000
* var succeeded = false
* while(!succeeded) {
* try {
* emit(api.fetch(id))
* succeeded = true
* } catch(ioError : IOException) {
* delay(backOffTime)
* backOffTime *= minOf(backOffTime * 2, 60_000)
* }
* }
* }


キャッシュとAPIの両方のデータを流す例

何回でもemitできるので、問題なく、キャッシュもAPIからの取得したデータも流せます。

 * // a LiveData that tries to load the `User` from local cache first and then tries to fetch

* // from the server and also yields the updated value
* val user = liveData {
* // dispatch loading first
* emit(LOADING(id))
* // check local storage
* val cached = cache.loadUser(id)
* if (cached != null) {
* emit(cached)
* }
* if (cached == null || cached.isStale()) {
* val fresh = api.fetch(id) // errors are ignored for brevity
* cache.save(fresh)
* emit(fresh)
* }
* }


データベースからの変更を常に受け取る例

(emitSource()を使うとLiveDataで流れてくる内容をemitすることができます。)

databaseから瞬時にLiveDataを受け取り、そしてそれを情報源として利用し、サーバーからのデータも受け取るようにします。

Roomを使っているので、insert()によってデータベースが更新され、データベースの更新は fromDb LiveDataを更新します。

詳しくは https://developer.android.com/training/data-storage/room/accessing-data#query-observable

 * // a LiveData that immediately receives a LiveData<User> from the database and yields it as a

* // source but also tries to back-fill the database from the server
* val user = liveData {
* val fromDb: LiveData<User> = roomDatabase.loadUser(id)
* emitSource(fromDb)
* val updated = api.fetch(id) // errors are ignored for brevity
* // Since we are using Room here, updating the database will update the `fromDb` LiveData
* // that was obtained above. See Room's documentation for more details.
* // https://developer.android.com/training/data-storage/room/accessing-data#query-observable
* roomDatabase.insert(updated)
* }
* ```


Google I/Oの動画より

liveData {}の引数にCoroutinesDispatcherを渡すことで、IOのためのスレッドでブロックを実行させられます。LiveDataの値を変えるはずのemit()をIOのためのスレッド呼び出せるのはなぜなのか見てみましょう。

image.png

理由は簡単です、withContext()してCoroutinesDispatcherを切り替えているだけです。

internal class LiveDataScopeImpl<T>(

internal var target: CoroutineLiveData<T>,
context: CoroutineContext,
override val initialValue: T? = target.value
) : LiveDataScope<T> {
// use `liveData` provided context + main dispatcher to communicate with the target
// LiveData. This gives us main thread safety as well as cancellation cooperation
private val coroutineContext = context + Dispatchers.Main
...

// **** withContextが使われている!! ****
override suspend fun emit(value: T) = withContext(coroutineContext) {
target.clearSource()
target.value = value
}
}


まとめ

最初はなんのために必要なの?って思いましたが、コードを読んで確かに必要かなと思いました。

画面回転したときに終了しないというテクニックや、再実行のハンドリングなどは自分で実装すると少し骨が折れるものになる気がします。

このライブラリを使うとかなりいい感じに処理してくれるので、Kotlin Coroutinesによる実装でハマるポイントが減りそうです。