Edited at
KotlinDay 5

入門Kotlin coroutines

More than 1 year has passed since last update.


前書き

coroutineはKotlin 1.1から搭載された新機能ですが、まだexperimentalなこともあってバリバリ使っている人は多くないと思われます。しかしながら、experimentalとはいえproduction readyなことは宣言されており、Kotlin 1.3ではexperimentalが外れる(予定の)ため、そろそろ学んでおいても悪くない時期になってきています。

そこで、この記事ではcoroutineを実践するにあたって抑えておくと役立つ知見を共有します。ご参考になれば幸いです!

注: この記事はKotlin 1.2.21, kotlinx.coroutines 0.22.5に基づいて書かれています。


coroutineとは何であって何でないのか


coroutineとは何か

coroutineとは何かを一言で表現すると、「中断可能な計算インスタンス」になります。ここでいう計算インスタンスとは、 Thread クラスのインスタンスのように特定のコードブロックが紐付いたインスタンスのことです。coroutineもインスタンスを作成し、紐付いたコードブロックを実行するという点では Thread と同じです。しかし、一度 start したらそのまま最後までコードブロックが実行される Thread とは異なり、coroutineは実行の途中で処理を中断することができ、またその中断したところからそのままの状態で実行を再開することができます。

launch(UI) { // coroutineの作成 & 実行

val newItem = Item(...)
val token = getToken() // APIからTokenを取得する。結果が返るまでcoroutineを中断する
val item = postItem(token, newItem) // ItemをPOSTする。状態はそのままなのでnewItemを普通に参照できる
updateUI(item)
}

getToken を呼び出してから結果が返るまでと postItem を呼び出してから結果が返るまでの間はcoroutineが中断していて、実行スレッドをブロックしません。このNon-blockingな性質がcoroutineにおいてもっとも重要な要素になります。


coroutineとは何でないのか

よく比較されていますが、coroutineはRxJavaの代替ではありません。coroutineは前述のように「中断可能な計算インスタンス」ですが、RxJavaは「データフローを宣言的に書くリアクティブプログラミング用フレームワーク」です。こうして並べてみると比べるものではないことは一目瞭然ですが、それでは何故比較されているかといえば、RxJavaはPromiseパターンのスーパーセットでもあり、coroutineはPromiseパターンを同期的に書く手法(async/awaitパターン)を提供しているからです。特にAndroid開発においては標準の非同期処理の手法が使いづらかったため、RxJavaはリアクティブプログラミングのためにではなく、単なるPromiseとして活用されているケースが多くありました。そのようなケースにおいては確かにasync/awaitは代替となりうることから、本来同じ抽象レイヤーにいないこの2者が比較されることになったのです。最近は有識者による啓蒙活動によりこの誤解は解けつつありますが、きちんと理解しておきたいポイントです。


coroutineの中断

coroutineは中断できると述べましたが、任意の場所で勝手に中断されるわけではありません。coroutineの中断は中断関数(suspending function)の呼び出しによって行われます。

suspend fun getToken(): Token

中断関数とはcoroutineまたは別の中断関数からしか呼び出せない特別な関数で、 suspend 修飾子を付けて宣言します。1つ覚えておきたいことは、中断関数は常にcoroutineを中断するとは限らないということです。同期処理を行うように中断関数を実装することもできます。これは主にネットワークリクエストとキャッシュなど、状況によって同期処理をしたり非同期処理をしたりするのを1つの中断関数で表現したりする場合に発生します。もちろん、常に同期処理を行う関数は中断関数として宣言する必要はありません(通常の関数でいいですよね)。


コールバックスタイルから中断関数への変換

中断関数はどのようにしてcoroutineの中断を実現しているのでしょうか? まずは代表的な中断関数の作り方を見てみましょう。

// よくあるコールバックスタイルの関数

fun getTokenCallback(onSuccess: (Token) -> Unit, onError: (Throwable) -> Unit) { /* ... */ }

// 上記関数の中断関数化
suspend fun getToken(): Token = suspendCoroutine { cont ->
getTokenCallback({ token ->
cont.resume(token)
}, { throwable ->
cont.resumeWithException(throwable)
})
}

suspendCoroutine はコールバックスタイルの関数を中断関数に変換するための関数で、以下のように宣言されています。

public inline suspend fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T

public interface Continuation<in T> {
public val context: CoroutineContext
public fun resume(value: T)
public fun resumeWithException(exception: Throwable)
}

suspendCoroutine に渡すラムダの引数である Continuation<T> は継続渡しスタイル(CSP; Continuation passing style)を実現するためのインターフェースです。処理が成功した場合は resume を呼び出し、失敗した場合は resumeWithException を呼び出します。

中断関数である getToken がcoroutineから呼び出されると、内部ではコールバックスタイルの getTokenCallback が呼び出されます。 getTokenCallback の呼び出しは非同期なので即座に終了し、その時点でcoroutineが中断します。 getToken 呼び出し以降の処理は、 Continuationresume or resumeWithException が呼び出されてcoroutineが再開されることで実行されます。

つまるところ、中断関数の中身はコールバックスタイルの関数です。コールバックで完了後の処理を書く代わりに、同期的な処理と同じように次の行に処理を書くことができるわけです。

// コールバックで書く場合

val newItem = Item(...)
getToken { token ->
postItem(token, newItem) { item ->
updateUI(item)
}
}

// coroutineで書く場合
launch(UI) {
val newItem = Item(...)
val token = getToken()
val item = postItem(token, newItem)
updateUI(item)
}

これが実行スレッドをブロックしない中断の正体です。


coroutineの作成と並列処理

中断関数でcoroutineを中断できることはわかりました。それでは肝心のcoroutineはどうやって作成するのでしょうか? それにはcoroutineビルダーを使います。


launch coroutineビルダー

もっともシンプルなcoroutineビルダーが launch です。

launch {

// 中断関数が書ける
}

launch の引数のラムダは suspend 修飾子付きの中断ラムダ(suspending lambda)なので、中で中断関数を呼び出すことができます。戻り値は Job インターフェースで、coroutineの実行をキャンセルしたり、完了まで中断したりできます。

fun cancelTest() {

val job = launch { /* ... */ }
job.cancel() // キャンセル
}

suspend fun joinTest() {
val job = launch { /* ... */ }
job.join() // ↑のcoroutineが完了するまで中断
}

join は中断関数なので、別のcoroutineまたは中断関数からしか呼び出せない点は注意が必要です。


async coroutineビルダー

よく使うcoroutineビルダーはもう1つあります。 async coroutineビルダーです。

suspend fun awaitTest() {

val deferred = async {
// ...
return@async 10
}
val result = deferred.await()
println("result = $result") // "result = 10" が出力される
}

fun cancelTest() {
val deferred = async { /* ... */ }
deferred.cancel() // DeferredはJobを継承しているのでキャンセルも呼べる
}

こちらは launch と異なり値を返すことができるcoroutineを生成します。戻り値の型は Deferred<T> で、中断関数である await を呼び出すとそのcoroutineの結果を受け取ることができます。


coroutineビルダーの使い時

中断関数はcoroutineまたは別の中断関数からしか呼び出せません。そのため、中断関数を使用するにはどこかでエントリーポイントとなるcoroutineを作成する必要があります。そのために使うべきなのは launch coroutineビルダーです。エントリーポイントcoroutineの外は中断不可能な世界なので、 async coroutineビルダーを使ったとしても await で結果を取ることができず、意味がないためです。

もう1つの使い所が並列処理をする場合です。

suspend fun taskA(): Int { /* ... */ }

suspend fun taskB(): Int { /* ... */ }

suspend fun parallelSum(): Int {
val a = async { taskA() }
val b = async { taskB() }
return a.await() + b.await()
}

中断関数はそのまま並べて書くと直列に実行されますが、並列に実行したい場面ももちろんあります。その際は async coroutineビルダーを使って新しくcoroutineを作り、それを await することで結果の待ち合わせを行います。


デフォルト並列はclassic way

C#などの先行してasync/awaitパターンを実現している言語は、デフォルトが並列処理になっていて、直列処理が必要なときにawaitする書き方が主流です。Kotlinで書くと以下のようなイメージです。

fun taskA(): Deferred<Int> = async { /* ... */ }

fun taskB(a: Int): Deferred<Int> = async { /* ... */ }

fun serialTask() = async {
val a = taskA().await()
return taskB(a).await()
}

Kotlin coroutinesではこのような書き方は非推奨です。通常の関数が一行ずつ上から実行されるように、我々は並列処理よりも直列処理のほうに慣れ親しんでいます。また、一般的には並列処理よりも直列処理をしたい機会のほうが多く、そのたびに await を書く必要があるのは冗長です。さらにいえば、この書き方はerror-proneです。

fun taskA() = launch { /* ... */ }

fun someTask() = async {
// いろいろな処理
taskA().join()
// taskAの後に実施したい色々な処理
}

上記のコードはjoin を忘れたとしてもコンパイルが通りますが、 taskA が終わっている前提でいろいろな処理を行っている場合、再現の難しいバグを引き起こしかねません。

以上の理由により、Kotlinでは中断関数を使ってデフォルトを直列処理にする書き方を使います。

suspend fun taskA(): Int { /* ... */ }

suspend fun taskB(a: Int): Int { /* ... */ }

suspend fun serialTask(): Int {
val a = taskA()
return taskB(a)
}

publicな関数定義には JobDeferred<T> を使わずに中断関数を使用し、並列処理が必要になったらその場で async / await を書くのがKotlin流です。

KotlinConf 2017ではJetBrainsの中の人が「並列処理がデフォルトのasync/awaitはclassic wayだ」と発言しましたが、その背景にある哲学は「並列処理は直列処理よりも難しく間違えやすいのだから、並列処理であることを明示すべき」というものなのです。


実践レシピ

coroutineとは何なのか、どういう方針・哲学で使っていけばいいのかを理解したところで、実際に使っていくにあたって理解すべきことを見ていきましょう。


CoroutineContext

CoroutineContext とは、coroutineの内部状態をもろもろ持ってくれている神クラスです。詳細を語り始めると深すぎて終わらないので、ここでは CoroutineContext の重要な側面である、coroutineの実行スレッドを決定するという機能にだけ焦点を当てます。これまで launchasync といったcoroutineビルダーを使ってきましたが、これらには CoroutineContext を引数に渡すことで、coroutineの実行スレッドを指定することができます。

launch(UI) { /* UIスレッドで実行 */ }

async(CommonPool) { /* スレッドプールで実行 */ }

よく使うのはUIスレッドで実行してくれる CoroutineContext (Androidだと UI 、Swingだと Swing など)と、スレッドプール上で実行してくれる CommonPool です。coroutineビルダーの CoroutineContext のデフォルト引数は DefaultDispatcher インスタンスで、これはJVMでは CommonPool と(今のところ)同一です。

基本的にはエントリーポイントとなるcoroutineは UI コンテキストで実行し、並列処理に使うcoroutineには CommonPool を指定することになるでしょう。


実行スレッドのスイッチ

coroutineを作成するときに実行スレッドが指定できるということはわかりました。では、たとえばIO処理などワーカースレッドで行わせたい処理がある場合などは、毎回coroutineビルダーを使うべきなのでしょうか? 答えはNoです。

suspend fun someTaskInWorkerThread() {

withContext(CommonPool) {
// スレッドプール上で実行される
// 中断ラムダ内なので中断関数も使える
}
}

withContext 中断関数は CoroutineContext を切り替えるための関数です。 withContext の中では新しいcoroutineが(必要ならば)生成され、完了まで中断されます。たとえばワーカースレッドでしか実行すべきでないコードなことがわかっている場合は、上記のように記述することでスレッドの指定責務を呼び出し元に押しつける必要がなくなります。


coroutineの親子関係

突然ですが、以下のコードはどのような出力を行うと思いますか?

val job = launch {

launch {
delay(500) // delayは指定された時間だけ中断する関数です
println("done")
}
}
job.cancel()
println("cancel")

答えは以下の通りです。

cancel

done

キャンセルされたのは外側のcoroutineだけなので、内側のcoroutineは最後まで実行されます。この挙動を変えてキャンセルを伝搬したい場合はcoroutineの親子関係を設定します。

val job = launch {

launch(CommonPool + coroutineContext) { /* ... */ }
}

coroutineビルダーに渡すラムダには CoroutineScope 型のレシーバーが設定されており、その coroutineContext プロパティからcoroutineに紐付いている CoroutineContext にアクセスできます。親となるcoroutineの CoroutineContext+ 演算子で合成することで、coroutineの親子関係を指定できます。

それでは以下のように中断関数内で新しくcoroutineを作る場合はどうすればいいのでしょうか?

suspend fun invokeNewCoroutine() {

launch { // 親coroutineと紐付けたい!
// ...
}
}

中断関数内から現在の CoroutineContext にアクセスする方法はKotlin 1.2.20で入りそうですが、それまでは以下のスニペットをworkaroundとして使用できます。

import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn // intrinsicsなので手書きimportが必要

suspend fun coroutineContext(): CoroutineContext = suspendCoroutineOrReturn { it.context }

suspend fun invokeNewCoroutine() {
launch(CommonPool + coroutineContext()) { /* ... */ }
}

クライアントアプリの場合、画面の終了と同時にエントリーポイントのcoroutineをすべてキャンセルするという挙動は一般的です。その際に子coroutineも一緒にキャンセルしたい場合などに有効な手法です。


例外ハンドリング

coroutineにおける例外ハンドリングはいつもと特に変わりありません。つまり、try catch を使います。

launch {

val a = async { taskA() } // 失敗するかも
val b = async { taskB() } // 失敗するかも
try {
println("result = ${a.await() + b.await()}")
} catch (t: Throwable) {
// エラーハンドリング
}
}

async coroutineビルダーで作成したcoroutine内で発生した例外は、そのcoroutineを await する際に再送出されます。一方、 launch coroutineビルダー内で発生した例外はそのまま UncaughtExceptionHandler に回されます。coroutineの作成元に例外を伝えたい場合は async coroutineビルダーを使わなければいけない点には注意が必要です。


並列処理のキャンセル

またまた問題です。以下のコードはどのような出力を行うと思いますか?

suspend fun taskA(): Int {

delay(500)
throw RuntimeException()
}

suspend fun taskB(): Int {
delay(1000)
println("taskB done")
return 1
}

suspend fun task() {
val a = async { taskA() }
val b = async { taskB() }

try {
println("result = ${a.await() + b.await()}")
} catch (t: Throwable) {
println("error")
}
}

この流れから予想が付くかもしれませんが、答えは以下の通りです。

error

taskB done

taksAtaskB のどちらかが失敗したときはもう片方をキャンセルしたいという場合は、以下のような関数を定義すると便利です。

inline fun <T> withCancellation(vararg jobs: Job, block: () -> T): T {

try {
block()
} catch (t: Throwable) {
jobs.forEach { it.cancel(t) }
throw t
}
}

suspend fun task() {
val a = async { taskA() }
val b = async { taskB() }

try {
withCancellation(a, b) {
println("result = ${a.await() + b.await()}")
}
} catch (t: Throwable) {
println("error")
}
}


終わりに

coroutineによる非同期処理について、一通り見てきました。本当は Channelselect の話や actor の話もしたかったのですが、既に記事が長くなってしまったのでまた別の機会に書きたいと思います。

coroutineはNon-blockingな非同期処理を同期処理と統一的な書き方で扱える便利な道具ですので、是非使ってみてください。