Edited at

kotlin coroutines の各種例とガイド / Guide to kotlinx.coroutines by example

注意:この記事はリリース前の文章を元にしたのですが、リリース時点で API の破壊的変更があったようですが内容をまだアップデートできていません。参考にお読み下さい。


Original: https://github.com/Kotlin/kotlinx.coroutines/blob/7861411fbce7f6c17e97dd2fb2961ebce5587930/README.md

Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.


上記の雑な訳。2018/08 時点。Apache 2.0


Guide to kotlinx.coroutines by example

This is a guide on core features of kotlinx.coroutines with a series of examples.


Introduction and setup

言語としての Kotlin は、他の各種ライブラリが使うような、最低限の low-level API 群だけを標準ライブラリとして提供する。他の多数の言語と違って、async await は Kotlin におけるキーワードではなくて、標準ライブラリの一部でもない。

kotlinx.coroutines はこのような(async / await を提供する)高レベルライブラリである。このガイドでカバーされる、数々の高レベルな coroutine-enabled primitives を含む。例えば launch, async 等。このガイドで使われている要素を利用するために kotlinx-coroutines-core を依存関係に追加することが必要である。


Table of contents


Coroutine basics

この章では coroutine 概念の基本を取り扱う


Your first coroutine

最初の coroutine

以下のコードを実行すると:

fun main(args: Array<String>) {

launch { // launch new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello,") // main thread continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}


You can get full code here


以下の結果が得られる:

Hello,

World!

重要な事は、コルーチンは軽量スレッドだってことだ。launch coroutine builder によって起動される。

同じ結果は、launch { ... }thread { ... } に、 delay(...)Thread.sleep(...) に置換しても得る事ができる。

launchthread に置換したタイミングで以下のエラーが発生する:

Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function

これは、delay が launch の中でしか使えないために起こる。 delay は特殊な suspending function でスレッドを ブロックしない が、coroutine を suspends

する。)


Bridging blocking and non-blocking worlds

最初の例で、は non-blockingdelay(...)blocking Thread.sleep(...) が混じっていた。

こう言うコードでは簡単にどこがブロッキングで、どこがそうで無いかを見失ってしまう。

明示的な blocking を runBlocking coroutine builder を使った例は以下の通り:

fun main(args: Array<String>) { 

launch { // launch new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main thread continues here immediately
runBlocking { // but this expression blocks the main thread
delay(2000L) // ... while we delay for 2 seconds to keep JVM alive
}
}


You can get full code here


結果は同じで、ただコードが non-blocking delay を使うようになっただけだ。

runBlockingを呼び出した main スレッドは、runBlocking の中身が終了するまでブロックする。

この例はもっと Kotlin らしく以下の用に書き換えられる。runBlocking でmain 文全体を囲む効果がある。

fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine

launch { // launch new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main coroutine continues here immediately
delay(2000L) // delaying for 2 seconds to keep JVM alive
}


You can get full code here


この runBlocking<Unit> { ... } は top-level の main coroutine を開始するアダプターのように働く。

suspending functions の unit-tests を書くときにもこのやり方は使える

class MyTest {

@Test
fun testMySuspendingFunction() = runBlocking<Unit> {
// here we can use suspending functions using any assertion style that we like
}
}


Waiting for a job

他の coroutine の完了を待つために delay するのはいいや来方じゃない。non-blocking なやり方で、明示的な wait っていうのはこうだ:

fun main(args: Array<String>) = runBlocking<Unit> {

val job = launch { // launch new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // wait until child coroutine completes
}


You can get full code here


結果は同じだが、main coroutine は待ち時間に縛られないようになった。このほうがよい


Extract function refactoring

launch { ... } の中のコードをリファクタリングして分離することを考える。IntelliJ の(使ってるよね?) "Extract function" を使えば suspend が付いた新しい関数が出来上がる。これが人生初めての suspending function だ。 Suspending functions はコルーチンの中で通常関数と同じように使える。それが特別なのは関数の中でコルーチンを suspend できるってことだ。以下は spspending function の中で delay している。

fun main(args: Array<String>) = runBlocking<Unit> {

val job = launch { doWorld() }
println("Hello,")
job.join()
}

// this is your first suspending function
suspend fun doWorld() {
delay(1000L)
println("World!")
}


You can get full code here



Coroutines ARE light-weight

以下のコードを見て欲しい:

fun main(args: Array<String>) = runBlocking<Unit> {

val jobs = List(100_000) { // launch a lot of coroutines and list their jobs
launch {
delay(1000L)
print(".")
}
}
jobs.forEach { it.join() } // wait for all jobs to complete
}


You can get full code here


100K (10万) コルーチンを起動して、それぞれのコルーチンで "." を表示するやつだ。

launchthread に、delayThread.sleep に書き換えたらほとんどのマシンで OOM (out-of-memory) Error になるだろう。


Coroutines are like daemon threads

コルーチンはデーモンスレッドのようなやつだ。次のコードは長期間実行されるコルーチンを main 分の中で実行する。 coroutine は "I'm sleeping" を表示して、3回表示され終わるタイミングで main 文は終了していることが解る。

fun main(args: Array<String>) = runBlocking<Unit> {

launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // just quit after delay
}


You can get full code here


出力は3回目で終わる。

I'm sleeping 0 ...

I'm sleeping 1 ...
I'm sleeping 2 ...

まとめ:アクティブな coroutine はプロセスの終了を妨げることはない。つまり daemon threads のように働く。


Cancellation and timeouts

実運用で重要なキャンセルとタイムアウトについて


Cancelling coroutine execution

小さな main 関数だけで終わるようなアプリケーションにとっては、全ての coroutine が勝手に終了されるというのは良いアイディアのように聞こえるが、大きな、長期間稼働するサーバーアプリではよりきめ細やかな制御が必要になる。launch関数は Job 型の変数を返し、起動中のコルーチンをコントロールできる:

fun main(args: Array<String>) = runBlocking<Unit> {

val job = launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
}


You can get full code here


アウトプットは以下のようになる:

I'm sleeping 0 ...

I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

job.cancel が呼び出された後は、coroutine から他の出力がなくなるのが解る。何故ならキャンセルされたから!

ちなみに、canceljoin を同時に行う cancelAndJoin メソッドもある。


Cancellation is cooperative

キャンセルと協調動作について

Coroutine のキャンセルは 協調的 でありうる。Coroutine はキャンセル可能にしておくべきだ。

全ての suspending functions in kotlinx.coroutinescancellable である。

彼らはコルーチンのキャンセル状態をチェックしてキャンセルされていれば CancellationException を throw するように実装されている。

ただし、coroutine が処理中で cancel 状態をチェックしない場合、キャンセルされない。以下の例を参照:

fun main(args: Array<String>) = runBlocking<Unit> {

val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}


You can get full code here


このれいで解るように、cancel がされても while ループの中で interrupt されることはない。


Making computation code cancellable

計算処理コードを cancellable にする方法は二つある。一つは定期的に suspending function を呼び出してキャンセル状態かチェックすること。

この目的のためには yield ファンクションがいいだろう。

もう一つは明示的に cancellation status をチェックすることで、以下が例になる。

先の例の while (i < 5)while (isActive) に置き換えて起動すると

fun main(args: Array<String>) = runBlocking<Unit> {

val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}


You can get full code here


実行結果をみれば解るように、loop はキャンセルされた、isActive

CoroutineScope object を経由して、ブロックの中で利用可能なプロパティである。


Closing resources with finally

キャンセル可能な suspending function は キャンセルされると CancellationException を Throw して、この例外は通常の方法で対処可能である。

例えば, try {...} finally {...} 構文や、Kotlin の use 拡張関数で coroutine がキャンセルされたときの終了処理を行える:

fun main(args: Array<String>) = runBlocking<Unit> {

val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
println("I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}


You can get full code here


joincancelAndJoin は両方 finalization action が完了するのを待つ。上の実行例は以下の用になる:

I'm sleeping 0 ...

I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
I'm running finally
main: Now I can quit.


Run non-cancellable block

coroutine がキャンセルされた場合、finnaly の中で suspending function を実行しようとしても CancellationException が発生するだけになる。

多くの場合、これは問題ではない、何故ならほとんどの closing operation (closing a file, cancelling a job, or closing any kind of a

communication channel) は non-blocking (not suspending function)だから。

しかし、時として finally 内で suspend function を呼びたいかも知れない。そのような時の為に

withContext(NonCancellable) {...} と書くことでブロックを保護できる。

見ての通り withContext function と NonCancellable context の組み合わせで、以下がその例:

fun main(args: Array<String>) = runBlocking<Unit> {

val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}


You can get full code here



Timeout

実環境で最も頻繁に発生する、coroutine の実行をキャンセルしたい理由は、実行時間を超過したためだろう。

手作業で Job の実行と実行時間をモニタリングするのではなくて、スグに利用可能な withTimeout 関数があるのでこれを使って欲しい。

以下が例である:

fun main(args: Array<String>) = runBlocking<Unit> {

withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}


You can get full code here


It produces the following output:

I'm sleeping 0 ...

I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS

タイムアウトの場合、CancellationException のサブクラスである TimeoutCancellationException が throw される。

我々はまだそんな例外のスタックトレースを見たことが無いが、それはcoroutine の中で CancellationException は通常の coroutine 完了の理由として扱われるからである。

しかし、この例では withTimeoutmain 関数直下で使っているので、それが見えることになる.

cancellation はただの例外なので、全てのリソースは通常通り close される。

タイムアウトの時だけ特別な処理をしたいと思ったら、 try {...} catch (e: TimeoutCancellationException) {...} block でタイムアウト処理を囲えば良い。

または、withTimeoutOrNull 関数を使えば、withTimeout と同じに終了してくれるが、タイムアウト時に例外では無く null を返してくれるようになる:

fun main(args: Array<String>) = runBlocking<Unit> {

val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
}


You can get full code here


表示は以下の用に、例外では無く null が帰る

I'm sleeping 0 ...

I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null


Composing suspending functions

suspending functions を組み合わせる

このセクションでは、suspending functions を組み合わせる方法について述べる


Sequential by default

2つの suspending function を実行する必要があると思って欲しい。例えばリモートサーバーに行って値を取ってくるなんていう関数だ。

今回は仮に1秒 delay して整数を返す関数を用意する。

suspend fun doSomethingUsefulOne(): Int {

delay(1000L) // pretend we are doing something useful here
return 13
}

suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 29
}

最初に doSomethingUsefulOne を実行して その後に doSomethingUsefulTwo を呼び出す必要がある場合、どのようにしたらいいか?

実際は、最初の関数の結果が二つ目の関数を呼び出す必要があるかどうかの判断に使える。

このコードは coroutine の中にあるので、ただ単に通常の順次呼び出しと同じやり方でやれば、順次呼び出しになる。

次の例はそのやりかたで合計時間を計算する例だ:

fun main(args: Array<String>) = runBlocking<Unit> {

val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
}


You can get full code here


It produces something like this:

The answer is 42

Completed in 2017 ms


Concurrent using async

仮に(この場合も) doSomethingUsefulOnedoSomethingUsefulTwo に依存関係がなければ、

両方同時に呼び出してより早く答えを得たいとおもいますよね? そのために async が助けになる。

概念的には、asynclaunch のようなものだ。軽量スレッドであるコルーチンを新しく立ち上げて(※訳注)、

他の coroutine と平行して、その coroutine 内で処理を行う。launch との違いとして、launch は Job を返して返値が無いが、

asyncDeferred を返す。Deferred は軽量 / non-blocking な future (Java の future)で、結果を後で返すための promise の表現だ。

Deferred な値を後から取得する為に .await() を使える。

Deferred is a Job であり、必要に応じて cancel できる。

※訳注:coroutine としては分離しているが、非分離スレッドで実行もできる(Guava の MoreExecutors.directExecutor() のイメージ)。というか Microservices freak は同じスレッドで async して初めての suspending function の呼び出しで分離して欲しいと思っているはず

fun main(args: Array<String>) = runBlocking<Unit> {

val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}


You can get full code here


It produces something like this:

The answer is 42

Completed in 1017 ms

二倍早くなった!何故なら二つの coroutine を並列実行するようにしたからだ。

覚えておいてほしい: coroutine を使った 平行性は常に明示的に(この場合 async を使って)行う。


Lazily started async

async には遅延実行の機能もある。start オプションに CoroutineStart.LAZY を渡す.

この場合 coroutine の実行は、値が確実に必要と解る await が呼ばれたタイミングか start 関数が呼ばれたタイミングまで遅延される:

fun main(args: Array<String>) = runBlocking<Unit> {

val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
// some computation
one.start() // start the first one
two.start() // start the second one
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}


You can get full code here


It produces something like this:

The answer is 42

Completed in 1017 ms

この例では、2つのコルーチンが定義されるが、直ちには実行されず、プログラマーにそれらの実行をいつ始めれば良いかが委ねられる。

ノート: 我々が await を println の中で呼んだタイミングでまだ start が呼ばれていなくても,

await が呼ばれたタイミングで計算が順次実行されるが、これは明らかに LAZY のユースケースではないだろう。

async(start = CoroutineStart.LAZY) のユースケースは標準的な遅延関数の置き換えで、計算に suspending function が必要となる場合に利用される。


Async-style functions

async coroutine builder を使えば、

doSomethingUsefulOnedoSomethingUsefulTwo を非同期に呼び出す async style function を定義することができる。

このような関数に Async 接尾辞を付けるのは良いスタイルだ。

"Async" 接尾辞は、その関数が非同期計算を開始し、deferred で結果を返すことを解りやすくする。

// The result type of somethingUsefulOneAsync is Deferred<Int>

fun somethingUsefulOneAsync() = async {
doSomethingUsefulOne()
}

// The result type of somethingUsefulTwoAsync is Deferred<Int>
fun somethingUsefulTwoAsync() = async {
doSomethingUsefulTwo()
}

ノート:これらの xxxAsync 関数は suspending functions ではない. これらの関数はどこからでも利用できる。

ただし、返値は常に非同期で(ここでは 並列性を持って)実行される事を意味している。

それらの関数を coroutine の外で利用する例を示す。呼び出しは外で行うが、.await()runBlocking {...} 内で呼び出す必要がある:

// note, that we don't have `runBlocking` to the right of `main` in this example

fun main(args: Array<String>) {
val time = measureTimeMillis {
// we can initiate async actions outside of a coroutine
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
// but waiting for a result must involve either suspending or blocking.
// here we use `runBlocking { ... }` to block the main thread while waiting for the result
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}


You can get full code here



Coroutine context and dispatchers

coroutine context (coroutine 文脈)とディスパッティについて

coroutine は、常に、CoroutineContext

型で表現される context 上で実行される。CoroutineContext は Kotlin 標準ライブラリの一部だ。

Coroutine context はいくつかの変数の組み合わせである。特に重要な要素は以前見たコルーチンの Jobと、この章で述べるその dispatcher だ。


Dispatchers and threads

ディスパッチャーとスレッド

Coroutine コンテキストは coroutine dispatcher を含んでいる。dispatcher によってどの thread(s) 上で coroutine を実行すれば良いのかを決定している。Coroutine dispatcher は coroutine が特定のスレッド、スレッドプールまたは confined の場所のどこで実行されるかを制御できる。

全ての coroutines builders 、例えば launchasync は任意引数として

CoroutineContext

パラメータを取る。CoroutineContext は明示的に dispatcher を指定する時に使える。

次の例を実行して欲しい:

fun main(args: Array<String>) = runBlocking<Unit> {

val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}

(個人メモ、coroutineContext でこうなるのはへぇという感じ)


You can get full code here


It produces the following output (maybe in different order):

      'Unconfined': I'm working in thread main

'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
'newSTC': I'm working in thread MyOwnThread
'coroutineContext': I'm working in thread main

以前のセクションで利用していた default dispatcher は DefaultDispatcher で、これは現在の実装では CommonPool と等しい。つまに launch { ... } って書くのは launch(DefaultDispatcher) { ... } と、つまり launch(CommonPool) { ... } と同じ.

parent

coroutineContext

Unconfined context の違いはこの後スグ出てくる。

Note: newSingleThreadContext は新しいスレッドを作り、これはすごいリソースを消費する。

実際のリアルアプリケーションでは、これは不要になったら close 関数をつかってリリースされるか、top-level の変数で引き回して再利用する必要がある。(訳注:つまり、ループや処理のために new Thread するのは止めよう!)


Unconfined vs confined dispatcher

Unconfired v.s. confined dispatcher

Unconfined coroutine dispatcher は coroutine を caller thread で始める。ただし、最初の suspension point までだ。Suspension が終わると、呼び出した suspending function によって resume が実行されるスレッドが決まる。Unconfined dispatcher は、coroutine が CPU time を消費せず、UI のように特殊なスレッドに束縛されている共有データをアップデートしないときにに適切な dispatcher だ。

一方で、coroutineContext がある。これはどんな coroutine でも利用可能なプロパティで、

この特定の coroutine context の参照である。

この方法で、parent context を継承することができる。runBlocking coroutine のデフォルトの dispatcherは、特に、呼び出しスレッドに縛り付けられ、これを継承することで実行を予測可能な FIFO 順序でこのスレッド上で行う効果がある。

fun main(args: Array<String>) = runBlocking<Unit> {

val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
delay(500)
println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}


You can get full code here


Produces the output:

      'Unconfined': I'm working in thread main

'coroutineContext': I'm working in thread main
'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
'coroutineContext': After delay in thread main

runBlocking {...}coroutineContext を継承した coroutine は main thread で実行を継続して(ログの4行目)、

一方で unconfined は delay 関数が使っている default executor 上で処理を継続している。


Debugging coroutines and threads

coroutine と thread のデバッグ

Coroutines は特定の thread で動き、Unconfined ディスパッチャや multi-thread dispatcher であれば他の thread 上でresumeする。single-thread dispatcher のときでも、coroutine が実行中かどうかを確認することは難しい。アプリケーションをデバッグする共通のアプローチとして、thread name をログファイルの書く行に出力しておく、というのがある。この機能は広くログフレームワークでサポートされている。coroutine を使う時には、thread 名だけでは多くを教えてくれないこともあり、 kotlinx.coroutines はこれらを簡単にする仕組みを提供している。

以下のコードを -Dkotlinx.coroutines.debug JVM option と共に実行してみて欲しい:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) = runBlocking<Unit> {
val a = async(coroutineContext) {
log("I'm computing a piece of the answer")
6
}
val b = async(coroutineContext) {
log("I'm computing another piece of the answer")
7
}
log("The answer is ${a.await() * b.await()}")
}


You can get full code here


このコードには 3 つのコルーチンがある。main coroutine (#1) -- runBlockingが作ると async が作った a, b である。全てのコルーチンは runBlocking context の中で実行されて main thread に束縛されている。出力コードは以下の用になる。

[main @coroutine#2] I'm computing a piece of the answer

[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42

log 関数はスレッドネームを出力するが、デバッグフラグが付いている時にはそこに実行中の coroutine が追記されている。

他の debugging facilities は newCoroutineContext のドキュメントの中に記述がある。


Jumping between threads

thread 間のジャンプ

以下のコードを -Dkotlinx.coroutines.debug JVM option 付きで実行してみて欲しい:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}


You can get full code here


このコードには、まだ紹介していない新しい機能が入っている。

* runBlocking を特定の context を明示して使っている

* withContext を特定の context に切り替えるために使っている。同じ coroutine にいるままで

[Ctx1 @coroutine#1] Started in ctx1

[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

ノート:これは Kotlin 標準ライブラリに含まれている use 関数の説明にもなっている。この関数は newSingleThreadContext で作られたスレッドが不要になったタイミングで削除している。


Job in the context

context 中の Job

Coroutine の Job は context の一部である。使用されている coroutine から、context を取得する事ができる。これには

coroutineContext[Job] を使う:

fun main(args: Array<String>) = runBlocking<Unit> {

println("My job is ${coroutineContext[Job]}")
}


You can get full code here


It produces something like that when running in debug mode:

My job is "coroutine#1":BlockingCoroutine{Active}@6d311334

CoroutineScopeisActive は次のショートカットだ

coroutineContext[Job]?.isActive == true.


Children of a coroutine

coroutine の子ども

coroutine の coroutineContext

が他の coroutine を起動するのに使われると、新しい coroutine の Job は親の coroutine Job の 子ども になる。

親の coroutine が cancel されると、その子の coroutine も再帰的に cancel される。

fun main(args: Array<String>) = runBlocking<Unit> {

// launch a coroutine to process some kind of incoming request
val request = launch {
// it spawns two other jobs, one with its separate context
val job1 = launch {
println("job1: I have my own context and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
}
// and the other inherits the parent context
val job2 = launch(coroutineContext) {
delay(100)
println("job2: I am a child of the request coroutine")
delay(1000)
println("job2: I will not execute this line if my parent request is cancelled")
}
// request completes when both its sub-jobs complete:
job1.join()
job2.join()
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}


You can get full code here


The output of this code is:

job1: I have my own context and execute independently!

job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?

つまり launch(coroutineContext) をしている job2 は cancel propagation された。


Combining contexts

context を結合する

Coroutine contexts は + operator を使うことで結合できる。右側の context が対応する右側 context の entry を置き換える。例えば、親 coroutine の Job を継承しながら、その dispatcher を置換するには:

fun main(args: Array<String>) = runBlocking<Unit> {

// start a coroutine to process some kind of incoming request
val request = launch(coroutineContext) { // use the context of `runBlocking`
// spawns CPU-intensive child job in CommonPool !!!
val job = launch(coroutineContext + CommonPool) {
println("job: I am a child of the request coroutine, but with a different dispatcher")
delay(1000)
println("job: I will not execute this line if my parent request is cancelled")
}
job.join() // request completes when its sub-job completes
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}


You can get full code here


The expected outcome of this code is:

job: I am a child of the request coroutine, but with a different dispatcher

main: Who has survived request cancellation?


Parental responsibilities

保護者の責任

親 coroutine は常に子 coroutine の実行を待つ。親は明示的に子をトラックする必要は無く、待つために join を使う必要も無い:

fun main(args: Array<String>) = runBlocking<Unit> {

// launch a coroutine to process some kind of incoming request
val request = launch {
repeat(3) { i -> // launch a few children jobs
launch(coroutineContext) {
delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
println("Coroutine $i is done")
}
}
println("request: I'm done and I don't explicitly join my children that are still active")
}
request.join() // wait for completion of the request, including all its children
println("Now processing of the request is complete")
}


You can get full code here


The result is going to be:

request: I'm done and I don't explicitly join my children that are still active

Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete


Naming coroutines for debugging

デバッグのための名前付けされた coroutine

自動的に割り当てられた id は coroutine のログが頻繁に存在して、単に同じ coroutine のログをまとめて見たいときには十分だが。ただし、 coroutine が特定のリクエストの処理に縛られたり特定のバックグラウンドタスクを実行しているときには、デバッグのために明示的な名前を付けたほうがいいだろう。CoroutineName context element は thread name のように名前を保存する。またそれはdebugging mode が on のときには thread name の一部として表示される。

次のコードがこのコンセプトだ:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
log("Started main coroutine")
// run two background value computations
val v1 = async(CoroutineName("v1coroutine")) {
delay(500)
log("Computing v1")
252
}
val v2 = async(CoroutineName("v2coroutine")) {
delay(1000)
log("Computing v2")
6
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}


You can get full code here


The output it produces with -Dkotlinx.coroutines.debug JVM option is similar to:

[main @main#1] Started main coroutine

[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42


Cancellation via explicit job

明示的な job を経由したキャンセル

context, 子 そして jobs を一緒に使うための、我々の knowledge を書かせて欲しい。

我々のアプリケーションはライフサイクルのあるオブジェクトを持っており、そのオブジェクトは coroutine ではない。例えば、Android application を書いている時は様々な coroutine を Android activity の中で起動し、それはデータを fetch / upload, animation するような非同期処理を実行する。このような coroutine はどれも activiety が destoroy されたらメモリーリークを避ける為に cancel されなければならない。

我々は coroutine のライフサイクルを Job 型のインスタンスを作成して activity のライフサイクルと紐付けておくことで事で管理することができる。Job() factory function を使うことで作られた job のインスタンスを次に示す。簡略化するために、launch(coroutineContext + job) を使うのではなく launch(coroutineContext, parent = job) 明示的に parent job が使われることを記述出来る。

今、単一の Job.cancel は全ての起動された子を cancel する。更に、Job.join は全てが完了するのを待ち、同じく cancelAndJoin も使うことができる:

fun main(args: Array<String>) = runBlocking<Unit> {

val job = Job() // create a job object to manage our lifecycle
// now launch ten coroutines for a demo, each working for a different time
val coroutines = List(10) { i ->
// they are all children of our job object
launch(coroutineContext, parent = job) { // we use the context of main runBlocking thread, but with our parent job
delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
println("Coroutine $i is done")
}
}
println("Launched ${coroutines.size} coroutines")
delay(500L) // delay for half a second
println("Cancelling the job!")
job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
}


You can get full code here


The output of this example is:

Launched 10 coroutines

Coroutine 0 is done
Coroutine 1 is done
Cancelling the job!

最初の3つのコルーチンが print されて残りは job.cancelAndJoin() でキャンセルされた。

つまり必要なのは Activity が作られたときに Job を作成して、coroutine を作るときにその子として作り、activity が destoroy されるときに Job を cancel することってことになる。Android lifecycle の場合、子 coroutine は非同期なので join することはできない。しかし join できるということはバックエンドサービスでは、紐付けられたリソースの利用を確かにするという意味で有用だ。


Channels

チャネル

Deferred な値は単一の値を coroutine 間で簡単に引き渡す方法を提供した。

チャネルは、複数の値をストリームで転送する方法を提供する。


Channel basics

Channel はコンセプトとしては BlockingQueue に非常に似ている。最大の違いは、pub オペレーションが blocking なのに対して、send オペレーションは suspending だってことで、同じく take オペレーションは blocking だが、receive は suspending だ。

fun main(args: Array<String>) = runBlocking<Unit> {

val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
}


You can get full code here


The output of this code is:

1

4
9
16
25
Done!


Closing and iteration over channels

クローズと channel のイテレーション

quque と違って、channel はもうエレメントが到達しない事を示すために close することができる。

受信側では、通常の for ループを使って要素を受け取るのが便利だ。

概念的に、close は close ようの特別なトークンを Channel に送信しているのようなものだ。

特殊トークンが受信されると、イテレーションはそこで止まり、同時に特殊トークン以前の全てのトークンが受信されていることが保証される:

fun main(args: Array<String>) = runBlocking<Unit> {

val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
}


You can get full code here



Building channel producers

Channel 生成源を作成する

element のシーケンスを生成する coroutine を作るパターンはほとんど同じ。

producer-consumer パターンの一部で、平行コードで頻繁に見ることができる。

たぶんこのように抽象化できる:channel をパラメーターとして取る関数関数があって、通常関数が結果を返さないと行けないのと逆で、関数の中で値が生成される。

このパターンに対応するため produce という名前の coroutine builder があり、producer の右にラムダ式を書くだけで簡単に channel を作成する事ができる。そして channel の拡張関数である consumeEach は consumer side の for loop を置き換えてくれる。

fun produceSquares() = produce<Int> {

for (x in 1..5) send(x * x)
}

fun main(args: Array<String>) = runBlocking<Unit> {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}


You can get full code here



Pipelines

パイプライン

パイプラインは、一つの coroutine が、潜在的に無限のデータストリームを生成するパターンである。

fun produceNumbers() = produce<Int> {

var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}

他の coroutine または coroutines が stream を consuming し、プロセッシングして、必要に応じて他の値を生成したりする。以下にいくつかの例を示す:

fun square(numbers: ReceiveChannel<Int>) = produce<Int> {

for (x in numbers) send(x * x)
}

The main code starts and connects the whole pipeline:

fun main(args: Array<String>) = runBlocking<Unit> {

val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
for (i in 1..5) println(squares.receive()) // print first five
println("Done!") // we are done
squares.cancel() // need to cancel these coroutines in a larger app
numbers.cancel()
}


You can get full code here


coroutine を close するのは、exmaple app のような小さなアプリケーションでは必須では無い。

なぜなら

coroutines are like daemon threads なので、終了時に同時に終了するから。

だが、大きなアプリケーションでは、不要なパイプラインは停止しておく必要がある。

その代わりに、pipeline coroutine を次の例で示すような

children of a main coroutine とする方法もある.


Prime numbers with pipeline

pipeline を使った素数の列

coroutine の pipeline を使った素数を生成する例を使って、パイプラインの極端な例を見てみましょう。

まずは無限の sequence of numbers から初める。ここでは、caller が coroutine がどこで実行されるかを選択するための、明示的な context パラメーターを produce builder に渡す方法を使う:

fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {

var x = start
while (true) send(x++) // infinite stream of integers from start
}

次の pipeline ステージでは、与えられた素数で割りきれる数字を filter する。

fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {

for (x in numbers) if (x % prime != 0) send(x)
}

さて、prime number stream を作成するために、2 から初めて、prime number を取得し、取得した prime number を使って新しい stream を作って行くことができる:

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... 

以下の例は最初の 10 個の prime number を表示する。

全ての coroutine は main thread の context で実行される。全ての coroutine は main runBlocking coroutine であり、そのcoroutineContext の子として起動されるため、明示的な全ての coroutine リストを持たなくても、 cancelChildren 拡張関数を呼び出すことでそれらを cancel することができる。

fun main(args: Array<String>) = runBlocking<Unit> {

var cur = numbersFrom(coroutineContext, 2)
for (i in 1..10) {
val prime = cur.receive()
println(prime)
cur = filter(coroutineContext, cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}


You can get full code here


The output of this code is:

2

3
5
7
11
13
17
19
23
29

ノート: 同じパイプラインを標準ライブラリの

buildIterator coroutine builder を使って作る事ができる。

producebuildIteratorで、sendyield で、 receivenext で、

ReceiveChannelIterator で、そしてそのコンテクストをりようしてだ。

この場合 runBlocking は必要無い。

しかし、上で示したような channel を使った pipeline の利点は、CommonPool context を使ったら実際は複数コアを使うことができる点だ。

何れの場合も、これは極端にインパクトのある素数列を生成する例だった。

実環境では、パイプラインは他の suspending function を呼んで (remote services への非同期呼び出しなど)、それらのパイプラインは buildSequence/buildIterator では構築できないものになるだろう。なぜならそれらは完全に非同期である produce と違ってsuspension を許していないからだ。

(個人メモ:なぜ?)


Fan-out

ファンアウト

複数の coroutine は同じ channel からイベントを受信するかもしれない。

まずは、1秒で10個の整数を定期的に生成する producer coroutine から始めよう:

fun produceNumbers() = produce<Int> {

var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}

そして、複数の processor coroutine (= consumer coroutine) も持つことができる。この例では単に coroutine id と受信した整数を println するだけだ:

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {

for (msg in channel) {
println("Processor #$id received $msg")
}
}

では、5個の processor を約 1 秒間稼働させよう。何が起こるか見てみよう:

fun main(args: Array<String>) = runBlocking<Unit> {

val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}


You can get full code here


結果は次の用になるはずだ。それぞれの整数を受信した processor id は違うだろうが。。。:

Processor #2 received 1

Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

ノート: producer coroutine が channel を close すると、他の processor coroutines による channel のイテレーションも順次終了する。

また、launchProcessor の中で、どのように明示的に channel をイテレートしているかにも注意して欲しい。consumeEach と違って、for loop pattern は複数の coroutines に対して完全に安全である。もし一つの coroutines が異常終了しても、他が channel の processing を継続できるが、consumeEach によって書かれた processor はその正常または異常終了時に、利用している channel を消費(cancel)してしまう。


Fan-in

ファンイン

複数の coroutines が同じチャネルに書き込んでもよい。たとえば、String の channel があって、suspending function が指定されたインターバルでもって定期的に同じチャネルを書き込むような場合:

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {

while (true) {
delay(time)
channel.send(s)
}
}

さあ、string を送り込む複数の channel を起動して何が起こるかみてみよう。

(この例では、main thread の子どもになるように main thread の corotuineContext で coroutine を起動している)

fun main(args: Array<String>) = runBlocking<Unit> {

val channel = Channel<String>()
launch(coroutineContext) { sendString(channel, "foo", 200L) }
launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}


You can get full code here


The output is:

foo

foo
BAR!
foo
foo
BAR!


Buffered channels

バッファされたチャネル

今までに見た channel はどれもバッファを持っていなかった。

バッファされていないチャネルは sender と receiver が receiver がお互いに出会った(ランデブー、した)タイミングで要素を転送する。

もし送信が先に行われたとすると、要素が受信されるまで sender は suspend するし、逆に receive が先に行われると、send が呼ばれるまで suspend する。

Channel() のfactory function と produce builder はオプションで buffer size を指定する capacity パラメータを取る。

このバッファーは sender が suspend するまえに複数の要素を送る事を可能にする、BlockingQueue のようなやつ。

BlockingQueue で capacity が満タンになると block するのと同じような形で、channel は buffer が満タンになったタイミングで初めて suspend する。

次のコードで挙動を見てみよう:

fun main(args: Array<String>) = runBlocking<Unit> {

val channel = Channel<Int>(4) // create buffered channel
val sender = launch(coroutineContext) { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
}


You can get full code here


"sending" が 5 回表示されたはずだ。:

Sending 0

Sending 1
Sending 2
Sending 3
Sending 4

キャパシティが 4 なので4回の send が成功し、5回目の呼び出しで Block した。(println は send の前にあることに注意)


Ticker channels

ティッカーチャネル(ティッカー = 心拍、あるいはチカチカするもの)

Ticker channel は特別なランデブー channel で、最後に element が consume されてから一定期間毎に Unit 要素を produce するようなものだ。

これだけでは使い道がないような気がするが、windowing や他の時間ベースの処理をするために、複雑な time-based produce pipeline を構築・運営するために有益である。

Ticker channel は select の中で、"on tick" 処理を実行するのにも使える。

この処理を実装する為に ticker factory method が使える。

追加の element が必要無いことを明示的に示すためには、cancel する必要がある。

それでは、例を見てみよう:

fun main(args: Array<String>) = runBlocking<Unit> {

val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet

nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")

nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")

// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

tickerChannel.cancel() // indicate that no more elements are needed
}


You can get full code here


It prints following lines:

Initial element is available immediately: kotlin.Unit

Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

ノート: デフォルトで ticker は consumer による pause を明に扱い、要素の間隔を一定に保つためにポーズが発生すると次のelementの発生を早める。

オプションで、mode パラメーターに TickerMode.FIXED_DELAY を設定すれば、この挙動を element 間で fixed delay を保つように変更できる。


Channels are fair

Channel は公正(fair share scheduling)です!

channel に対する、複数スレッドからの送受信操作は 公正 である。またそれらは first-in first-out 順序で配信される。

つまり、最初に receive を実行した coroutine が最初に要素を受け取る。次の例は 2 つの "poig" "pong" の2つの coroutine が

共有の table channel から "ball" を receive する例だ。

data class Ball(var hits: Int)

fun main(args: Array<String>) = runBlocking<Unit> {
val table = Channel<Ball>() // a shared table
launch(coroutineContext) { player("ping", table) }
launch(coroutineContext) { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}


You can get full code here


"ping" coroutine が最初に起動されるので、最初に ball を receive するのは "ping" になる。

"ping" coroutine がボールをsend した後に、直ちにレシーブを開始しても、次に ball は "pong" coroutine でレシーブされる。

なぜなら、"ping" coroutine が待つより先に "pong" が待っていたからである。

ping Ball(hits=1)

pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

ノート: たまに元になっている executor が実行中の場合に、 channel は unfair に見える挙動をすることもある。

詳細はこの Issue を参照してほしい。


Shared mutable state and concurrency

共有された変更可能な状態と concurrency

Coroutine は CommonPool のような multi thread dispatcher 上で動く場合、平行実行され得る。

そしてそれは、よくある平行プログラミングの問題を引き起こす。

主な問題は 変更可能な共有状態 への同期化されたアクセスである。

coroutine の世界におけるこの問題に対するいくつかの解決策は、multi-thread の世界における解決策と一緒だ、ただ他の物は独特である:


The problem

問題の定式化

1千 coroutine を起動して、全ての coroutine に 1千回処理をさせることを考える(合計で100万処理だ)

それと同時に、比較の為に実行時間を取得しておこう:

suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {

val n = 1000 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
val jobs = List(n) {
launch(context) {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}

まず始めに、共有の変数を multi-thread CommonPool context を使って increment していくシンプルな action から始めよう。

var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}


You can get full code here


最終的に何が見えた? "Counter = 1000000" にはほど遠かっただろう。なぜなら 1000 の coroutine が同期化されることなく counter を平行でインクリメントしたからである。

ノート: もし CPU 数が2より少ない古いシステムを使っていたら、1000000 が見えたかも知れない、何故なら CommonPool はこの場合1つのスレッドしか使わないからだ。

この結果を再現するためには、以下のコードを利用する必要がある。:

val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads

var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
counter++
}
println("Counter = $counter")
}


You can get full code here



Volatiles are of no help

Volatiles は助けにならない

よくある誤解として volatile を付けておけば平行性の問題が解決されるというのがある。やってみよう:

@Volatile // in Kotlin `volatile` is an annotation 

var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}


You can get full code here


コードは遅くなったが、まだ "Counter = 1000000" にはならない。何故なら volatile は対象変数への読み取りと書き取りを

直列化 (または技術的には "atomic") にする事は保証するが、それ以上の大きな動作(increment in our case)は atomic にしてくれないからだ。


Thread-safe data structures

Thread-safe なデータ構造

thread と coroutine で共に動く一般的な解決策は、thread-safe (synchronized, linearizable または atomic な)データ構造を使うことだ。

データ構造が、共有状態上で行われる処理に対応した synchronization を全て実施してくれる。

単純なカウンターであるこの場合では、我々は incrementAndGet 操作がある AtomicInteger クラスを使える:

var counter = AtomicInteger()

fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
}


You can get full code here


これは、この問題に関しては最速の解決策だ。

単純な counter や、collection, queue やその他標準的なデータ構造と基本的な操作に関しては標準のライブラリが解決策になる。

ただし、これはthread-safe な実装が用意されていない、複雑な状態やオペレーションには簡単にはスケールしない。


Thread confinement fine-grained

きめ細やかなスレッド制約

細やかな thread confinement は共有可変変数へのアクセスの問題を、特定のスレッドに限ることで解決する解決策だ。

これは、 UI application でよく使われている。UI state は特定の single event-dispatch/application thread に confined だ。

この方法は、single-threaded context を利用する事で簡単に coroutine にも適用する事ができる:

val counterContext = newSingleThreadContext("CounterContext")

var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) { // run each coroutine in CommonPool
withContext(counterContext) { // but confine each increment to the single-threaded context
counter++
}
}
println("Counter = $counter")
}


You can get full code here


むっちゃ遅い! 何故ならこれは きめ細やかな thread-confinement だからだ。インクリメントの度に multi-threaded CommonPool context から withContext を使った single-threaded context への切り替えが発生する。


Thread confinement coarse-grained

大域的 thread 保証

実際には、thread 保証は大きな固まりで行われることが多い。例えば、状態を変更する大きな business logic を single thread に縛り付けるなど。

以下の例はそのようなことをやっている、それぞれの coroutine を一つの thread context の中で動かす:

val counterContext = newSingleThreadContext("CounterContext")

var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(counterContext) { // run each coroutine in the single-threaded context
counter++
}
println("Counter = $counter")
}


You can get full code here


(fine-grained より)もっと早くて、正確な値が取得できるようになった!


Mutual exclusion

排他制御

排他制御による解決は、全ての共有状態への変更を平行して実行されない critical section で保護するやり方だ。

blocking world では、よく synchronizedReentrantLock をこのために使うんじゃないかな。

Coroutine の代用手段は Mutex と呼ばれている。これは lock and unlock 関数を持って critical section を区分することができる。

Mutex.lock() の最大の違いは、これは suspending function だってことだ。つまり OS thread を block することはない!

withLock 拡張関数が、 mutex.lock(); try { ... } finally { mutex.unlock() } パターンの簡単な表現なので使うと良い。

val mutex = Mutex()

var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}


You can get full code here


この例のロックは粒度が細かく、そのために実行時間も長くかかっている。

ただし、君が絶対に共有状態を定期的に更新しなければならなくて、束縛するのに適当なスレッドが無い・作成したく無い場合はこの例はいい選択だ。


Actors

actor は以下の3つの要素からなる entity である

* coroutine

* その coroutine の中に隠蔽されて束縛された状態(state)

* 他の coroutine とコミュニケーションを取るための channel

シンプルな actor は関数として書くことができるが、複雑な状態を持った actor はクラスとして表現するのが妥当だろう。

Kotlin は actor coroutine builder を持っている。

読んで字のごとく、スコープ内にメッセージ受け入れるために actor の mailbox channel をスコープ内に簡単に取り込み、

また送信 channel を結果 job object に結びつけて、handle のように取り回し可能な actor の単一参照を作る

actor を使う最初のステップは、actor が処理しようとしているメッセージをクラスとして定義することだ。

Kotlin の sealed classes はこの目的によく合致する。

ここでは CounterMsg sealed class を定義して、同時にカウンターをインクリメントする IncCounter メッセージと

値を取得する為の GetCounter メッセージを実装する。

後者はレスポンスを送信するために必要な物だ。

CompletableDeferred communication primitive は、受け手で処理したときに単一要素を受信する処理を表現し、

この目的ではレスポンスを actor から送信するために利用される。

// Message types for counterActor

sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply

これを使って、actor coroutine builder を呼び出す関数を定義しよう

// This function launches a new counter actor

fun counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}

main code は直感的に以下の用になる:

fun main(args: Array<String>) = runBlocking<Unit> {

val counter = counterActor() // create the actor
massiveRun(CommonPool) {
counter.send(IncCounter)
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}


You can get full code here


actor 自信がどんな context で実行されているのかは正確性にとって問題で無くなる。

Actor は coroutine であって、coroutine は順次実行され、特定のコルーチンへ状態を束縛することは、共有可変状態の問題への解決策として働く。

加えて、actor は彼ら自身の private 状態を変更することもでき、しかしロックを避ける為に他の actor へはメッセージを通じてのみ影響を及ぼせる。

actor はロードがかかっているときには locking よりも効率の良い手段である。なぜならそのような場合では常にやるべき作業があり、

しかし他の context にスイッチする必要が全く無いからだ。

(約:channel に絶え間なく send される状況下では、micro thread の切り替えすら発生しない、と言うようなことを言っている?)


ノート: actor coroutine builder は produce coroutine builder が二重になったものだ。

Actor はメッセージを受信する channel と紐付けられて、producer はメッセージを送信する channel に紐付けられる。



Select expression

Select 式

Select 式は複数の suspending 関数を同時に await することを可能にし、

最初に利用可能となったものを select (選択) する。


Selecting from channels

複数の channel からの select

fizz buzz という文字からなる二つの producer があるとしよう。fizz は 300ms 毎に "Fizz" を produce する:

fun fizz(context: CoroutineContext) = produce<String>(context) {

while (true) { // sends "Fizz" every 300 ms
delay(300)
send("Fizz")
}
}

bazz は 500ms 毎に "Buzz!" を produce する:

fun buzz(context: CoroutineContext) = produce<String>(context) {

while (true) { // sends "Buzz!" every 500 ms
delay(500)
send("Buzz!")
}
}

receive suspending function を使うと、我々は どちらかの channel からのメッセージの待ち状態へ移行することしか。

しかし select 式では、その onReceive 節を利用する事で、 両方 の待ち状態とすることができる:

suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {

select<Unit> { // <Unit> means that this select expression does not produce any result
fizz.onReceive { value -> // this is the first select clause
println("fizz -> '$value'")
}
buzz.onReceive { value -> // this is the second select clause
println("buzz -> '$value'")
}
}
}

全体を 7 回実行してみよう:

fun main(args: Array<String>) = runBlocking<Unit> {

val fizz = fizz(coroutineContext)
val buzz = buzz(coroutineContext)
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
}


You can get full code here


このコードの結果は以下のようになる:

fizz -> 'Fizz'

buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'


Selecting on close

select 中の onReceive 節は channel が close されていると、select は対応した例外を投げる。

onReceiveOrNull 節を使うと、channel が close されていた場合に特定の挙動をさせることができる。

次の例は、select が選択した節の結果を返却するような例となっている

suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =

select<String> {
a.onReceiveOrNull { value ->
if (value == null)
"Channel 'a' is closed"
else
"a -> '$value'"
}
b.onReceiveOrNull { value ->
if (value == null)
"Channel 'b' is closed"
else
"b -> '$value'"
}
}

a を 4回 "Hello" メッセージを produce する channel で、

b を 4回 "World" メッセージを produce する channel とする:

fun main(args: Array<String>) = runBlocking<Unit> {

// we are using the context of the main thread in this example for predictability ...
val a = produce<String>(coroutineContext) {
repeat(4) { send("Hello $it") }
}
val b = produce<String>(coroutineContext) {
repeat(4) { send("World $it") }
}
repeat(8) { // print first eight results
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}


You can get full code here


この結果は興味深い物になるので、何が起こるか詳しく分析してみよう:

a -> 'Hello 0'

a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed

何個かここから観測できることがある、

まず select は最初の節への 偏り があるということだ。同時に複数の channel が選択可能になった場合、最初のものが select される。

ここでは、両方の channel が同時に文字を produce しているので、a チャネル、つまり select の最初の節に書かれている、が勝者となる。

しかし、ここでは unbeffered channel を利用しているため、asend の呼び出しによって複数回 suspended になり、b にも送信機会が回ってくることもある。

二つ目に、 onReceiveOrNull は channel が閉じられたあと、直ちに select されるわけでは無いと言うことだ。


Selecting to send

送信のための selecting

Select 式は onSend 節を持っており、selection の偏りのある特性と組み合わせると非常にいい働きをしてくれる。

整数の producer であって、primary channel の consumer が追いつけないときに、値を side channel に送信するものを考えて見よう:

fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) {

for (num in 1..10) { // produce 10 numbers from 1 to 10
delay(100) // every 100 ms
select<Unit> {
onSend(num) {} // Send to the primary channel
side.onSend(num) {} // or to the side channel
}
}
}

コンシューマは非常に遅く、一つの整数を消費するのに 250ms かかるとする

fun main(args: Array<String>) = runBlocking<Unit> {

val side = Channel<Int>() // allocate side channel
launch(coroutineContext) { // this is a very fast consumer for the side channel
side.consumeEach { println("Side channel has $it") }
}
produceNumbers(coroutineContext, side).consumeEach {
println("Consuming $it")
delay(250) // let us digest the consumed number properly, do not hurry
}
println("Done consuming")
coroutineContext.cancelChildren()
}


You can get full code here


So let us see what happens:

Consuming 1

Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming


Selecting deferred values

deferred value を select する

Deferred value は onAwait 節を使って select することができる。

与えられた遅延の後に string を返却するような非同期関数を作り:

fun asyncString(time: Int) = async {

delay(time.toLong())
"Waited for $time ms"
}

ランダムな遅延を引数にその関数を1ダース作る事にする

fun asyncStringsList(): List<Deferred<String>> {

val random = Random(3)
return List(12) { asyncString(random.nextInt(1000)) }
}

そして、main 関数では最初に完了したものを待って、残りで active なままの deferred values の数を表示するようにする。

ここで利用している select 式は Kotlin DSK であるという事実により、

我々は 通常の code を利用して select に節を与えることができる。

この例では、deferred value を含んで居る list 上を走査して、それぞれの deferred value 上で onAwait 節を呼び出している。

fun main(args: Array<String>) = runBlocking<Unit> {

val list = asyncStringsList()
val result = select<String> {
list.withIndex().forEach { (index, deferred) ->
deferred.onAwait { answer ->
"Deferred $index produced answer '$answer'"
}
}
}
println(result)
val countActive = list.count { it.isActive }
println("$countActive coroutines are still active")
}


You can get full code here


出力はこうなる:

Deferred 4 produced answer 'Waited for 128 ms'

11 coroutines are still active


Switch over a channel of deferred values

channel producer function であって、deferred string の channel を消費し、それぞれの deferred が確定するまで待ってそれを

send するものを考える。ただし待ち時間は、次の deferred value が届くまでである。

この例は onReceiveOrNullonAwait 節を同じ select 中で利用している:

fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {

var current = input.receive() // start with first received deferred value
while (isActive) { // loop while not cancelled/closed
val next = select<Deferred<String>?> { // return next deferred value from this select or null
input.onReceiveOrNull { update ->
update // replaces next value to wait
}
current.onAwait { value ->
send(value) // send value that current deferred has produced
input.receiveOrNull() // and use the next deferred from the input channel
}
}
if (next == null) {
println("Channel was closed")
break // out of loop
} else {
current = next
}
}
}

これをテストするために、特定の時間だけ待った後に指定した文字列を resolve する単純な async function を利用する:

fun asyncString(str: String, time: Long) = async {

delay(time)
str
}

main function は switchMapDeferreds の結果を表示するシンプルな coroutine を起動して、テストデータを送信する::

fun main(args: Array<String>) = runBlocking<Unit> {

val chan = Channel<Deferred<String>>() // the channel for test
launch(coroutineContext) { // launch printing coroutine
for (s in switchMapDeferreds(chan))
println(s) // print each received string
}
chan.send(asyncString("BEGIN", 100))
delay(200) // enough time for "BEGIN" to be produced
chan.send(asyncString("Slow", 500))
delay(100) // not enough time to produce slow
chan.send(asyncString("Replace", 100))
delay(500) // give it time before the last one
chan.send(asyncString("END", 500))
delay(1000) // give it time to process
chan.close() // close the channel ...
delay(500) // and wait some time to let it finish
}


You can get full code here


結果は次のようになる:

BEGIN

Replace
END
Channel was closed


Further reading