注意:この記事はリリース前の文章を元にしたのですが、リリース時点で API の破壊的変更があったようですが内容をまだアップデートできていません。参考にお読み下さい。
Original: https://github.com/Kotlin/kotlinx.coroutines/blob/7861411fbce7f6c17e97dd2fb2961ebce5587930/README.md
Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
上記の雑な訳。2018/08 時点。Apache 2.0
Guide to kotlinx.coroutines by example
This is a guide on core features of kotlinx.coroutines
with a series of examples.
Introduction and setup
言語としての Kotlin は、他の各種ライブラリが使うような、最低限の low-level API 群だけを標準ライブラリとして提供する。他の多数の言語と違って、async
await
は Kotlin におけるキーワードではなくて、標準ライブラリの一部でもない。
kotlinx.coroutines
はこのような(async
/ await
を提供する)高レベルライブラリである。このガイドでカバーされる、数々の高レベルな coroutine-enabled primitives を含む。例えば launch
, async
等。このガイドで使われている要素を利用するために kotlinx-coroutines-core
を依存関係に追加することが必要である。
Table of contents
-
Coroutine basics / コルーチンの基本
- Your first coroutine / 最初のコルーチン
- Bridging blocking and non-blocking worlds / Blocking と Non-blocking の橋を架ける
- Waiting for a job / Jobを末
- Extract function refactoring / 関数のリファクタリング
- Coroutines ARE light-weight / コルーチンは 軽量 である
- Coroutines are like daemon threads / コルーチンはデーモンスレッドのようである。
- Cancellation and timeouts / キャンセルとタイムアウト
- Composing suspending functions
- Coroutine context and dispatchers
- Channels
- Shared mutable state and concurrency
- Select expression
- Further reading
Coroutine basics
この章では coroutine 概念の基本を取り扱う
Your first coroutine
最初の coroutine
以下のコードを実行すると:
fun main(args: Array<String>) {
launch { // launch new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello,") // main thread continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
You can get full code here
以下の結果が得られる:
Hello,
World!
重要な事は、コルーチンは軽量スレッドだってことだ。launch coroutine builder によって起動される。
同じ結果は、launch { ... }
を thread { ... }
に、 delay(...)
を Thread.sleep(...)
に置換しても得る事ができる。
(launch
を thread
に置換したタイミングで以下のエラーが発生する:
Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
これは、delay が launch の中でしか使えないために起こる。 delay
は特殊な suspending function でスレッドを ブロックしない が、coroutine を suspends
する。)
Bridging blocking and non-blocking worlds
最初の例で、は non-blocking の delay(...)
と blocking Thread.sleep(...)
が混じっていた。
こう言うコードでは簡単にどこがブロッキングで、どこがそうで無いかを見失ってしまう。
明示的な blocking を runBlocking coroutine builder を使った例は以下の通り:
fun main(args: Array<String>) {
launch { // launch new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main thread continues here immediately
runBlocking { // but this expression blocks the main thread
delay(2000L) // ... while we delay for 2 seconds to keep JVM alive
}
}
You can get full code here
結果は同じで、ただコードが non-blocking
delay を使うようになっただけだ。
runBlocking
を呼び出した main スレッドは、runBlocking
の中身が終了するまでブロックする。
この例はもっと Kotlin らしく以下の用に書き換えられる。runBlocking
でmain 文全体を囲む効果がある。
fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
launch { // launch new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main coroutine continues here immediately
delay(2000L) // delaying for 2 seconds to keep JVM alive
}
You can get full code here
この runBlocking<Unit> { ... }
は top-level の main coroutine を開始するアダプターのように働く。
suspending functions の unit-tests を書くときにもこのやり方は使える
class MyTest {
@Test
fun testMySuspendingFunction() = runBlocking<Unit> {
// here we can use suspending functions using any assertion style that we like
}
}
Waiting for a job
他の coroutine の完了を待つために delay するのはいいや来方じゃない。non-blocking
なやり方で、明示的な wait っていうのはこうだ:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { // launch new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // wait until child coroutine completes
}
You can get full code here
結果は同じだが、main coroutine は待ち時間に縛られないようになった。このほうがよい
Extract function refactoring
launch { ... }
の中のコードをリファクタリングして分離することを考える。IntelliJ の(使ってるよね?) "Extract function" を使えば suspend
が付いた新しい関数が出来上がる。これが人生初めての suspending function だ。 Suspending functions はコルーチンの中で通常関数と同じように使える。それが特別なのは関数の中でコルーチンを suspend できるってことだ。以下は spspending function の中で delay
している。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { doWorld() }
println("Hello,")
job.join()
}
// this is your first suspending function
suspend fun doWorld() {
delay(1000L)
println("World!")
}
You can get full code here
Coroutines ARE light-weight
以下のコードを見て欲しい:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = List(100_000) { // launch a lot of coroutines and list their jobs
launch {
delay(1000L)
print(".")
}
}
jobs.forEach { it.join() } // wait for all jobs to complete
}
You can get full code here
100K (10万) コルーチンを起動して、それぞれのコルーチンで "." を表示するやつだ。
launch
を thread
に、delay
を Thread.sleep
に書き換えたらほとんどのマシンで OOM (out-of-memory) Error になるだろう。
Coroutines are like daemon threads
コルーチンはデーモンスレッドのようなやつだ。次のコードは長期間実行されるコルーチンを main 分の中で実行する。 coroutine は "I'm sleeping" を表示して、3回表示され終わるタイミングで main 文は終了していることが解る。
fun main(args: Array<String>) = runBlocking<Unit> {
launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // just quit after delay
}
You can get full code here
出力は3回目で終わる。
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
まとめ:アクティブな coroutine はプロセスの終了を妨げることはない。つまり daemon threads のように働く。
Cancellation and timeouts
実運用で重要なキャンセルとタイムアウトについて
Cancelling coroutine execution
小さな main
関数だけで終わるようなアプリケーションにとっては、全ての coroutine が勝手に終了されるというのは良いアイディアのように聞こえるが、大きな、長期間稼働するサーバーアプリではよりきめ細やかな制御が必要になる。launch
関数は Job
型の変数を返し、起動中のコルーチンをコントロールできる:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
}
You can get full code here
アウトプットは以下のようになる:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
job.cancel
が呼び出された後は、coroutine から他の出力がなくなるのが解る。何故ならキャンセルされたから!
ちなみに、cancel と join を同時に行う cancelAndJoin メソッドもある。
Cancellation is cooperative
キャンセルと協調動作について
Coroutine のキャンセルは 協調的 でありうる。Coroutine はキャンセル可能にしておくべきだ。
全ての suspending functions in kotlinx.coroutines
は cancellable である。
彼らはコルーチンのキャンセル状態をチェックしてキャンセルされていれば CancellationException を throw するように実装されている。
ただし、coroutine が処理中で cancel 状態をチェックしない場合、キャンセルされない。以下の例を参照:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
You can get full code here
このれいで解るように、cancel がされても while ループの中で interrupt されることはない。
Making computation code cancellable
計算処理コードを cancellable にする方法は二つある。一つは定期的に suspending function を呼び出してキャンセル状態かチェックすること。
この目的のためには yield ファンクションがいいだろう。
もう一つは明示的に cancellation status をチェックすることで、以下が例になる。
先の例の while (i < 5)
を while (isActive)
に置き換えて起動すると
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
You can get full code here
実行結果をみれば解るように、loop はキャンセルされた、isActive は
CoroutineScope object を経由して、ブロックの中で利用可能なプロパティである。
Closing resources with finally
キャンセル可能な suspending function は キャンセルされると CancellationException
を Throw して、この例外は通常の方法で対処可能である。
例えば, try {...} finally {...}
構文や、Kotlin の use
拡張関数で coroutine がキャンセルされたときの終了処理を行える:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
println("I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
You can get full code here
join と cancelAndJoin は両方 finalization action が完了するのを待つ。上の実行例は以下の用になる:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
I'm running finally
main: Now I can quit.
Run non-cancellable block
coroutine がキャンセルされた場合、finnaly
の中で suspending function を実行しようとしても CancellationException が発生するだけになる。
多くの場合、これは問題ではない、何故ならほとんどの closing operation (closing a file, cancelling a job, or closing any kind of a
communication channel) は non-blocking (not suspending function
)だから。
しかし、時として finally 内で suspend function を呼びたいかも知れない。そのような時の為に
withContext(NonCancellable) {...}
と書くことでブロックを保護できる。
見ての通り withContext function と NonCancellable context の組み合わせで、以下がその例:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
You can get full code here
Timeout
実環境で最も頻繁に発生する、coroutine の実行をキャンセルしたい理由は、実行時間を超過したためだろう。
手作業で Job の実行と実行時間をモニタリングするのではなくて、スグに利用可能な withTimeout 関数があるのでこれを使って欲しい。
以下が例である:
fun main(args: Array<String>) = runBlocking<Unit> {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
You can get full code here
It produces the following output:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
タイムアウトの場合、CancellationException
のサブクラスである TimeoutCancellationException
が throw される。
我々はまだそんな例外のスタックトレースを見たことが無いが、それはcoroutine の中で CancellationException
は通常の coroutine 完了の理由として扱われるからである。
しかし、この例では withTimeout
を main
関数直下で使っているので、それが見えることになる.
cancellation はただの例外なので、全てのリソースは通常通り close される。
タイムアウトの時だけ特別な処理をしたいと思ったら、 try {...} catch (e: TimeoutCancellationException) {...}
block でタイムアウト処理を囲えば良い。
または、withTimeoutOrNull 関数を使えば、withTimeout と同じに終了してくれるが、タイムアウト時に例外では無く null
を返してくれるようになる:
fun main(args: Array<String>) = runBlocking<Unit> {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
}
You can get full code here
表示は以下の用に、例外では無く null が帰る
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
Composing suspending functions
suspending functions を組み合わせる
このセクションでは、suspending functions を組み合わせる方法について述べる
Sequential by default
2つの suspending function を実行する必要があると思って欲しい。例えばリモートサーバーに行って値を取ってくるなんていう関数だ。
今回は仮に1秒 delay して整数を返す関数を用意する。
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 29
}
最初に doSomethingUsefulOne
を実行して その後に doSomethingUsefulTwo
を呼び出す必要がある場合、どのようにしたらいいか?
実際は、最初の関数の結果が二つ目の関数を呼び出す必要があるかどうかの判断に使える。
このコードは coroutine の中にあるので、ただ単に通常の順次呼び出しと同じやり方でやれば、順次呼び出しになる。
次の例はそのやりかたで合計時間を計算する例だ:
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
}
You can get full code here
It produces something like this:
The answer is 42
Completed in 2017 ms
Concurrent using async
仮に(この場合も) doSomethingUsefulOne
と doSomethingUsefulTwo
に依存関係がなければ、
両方同時に呼び出してより早く答えを得たいとおもいますよね? そのために async が助けになる。
概念的には、async は launch のようなものだ。軽量スレッドであるコルーチンを新しく立ち上げて(※訳注)、
他の coroutine と平行して、その coroutine 内で処理を行う。launch との違いとして、launch は Job を返して返値が無いが、
async
は Deferred を返す。Deferred
は軽量 / non-blocking な future (Java の future
)で、結果を後で返すための promise
の表現だ。
Deferred な値を後から取得する為に .await()
を使える。
Deferred is a Job であり、必要に応じて cancel できる。
※訳注:coroutine としては分離しているが、非分離スレッドで実行もできる(Guava の MoreExecutors.directExecutor()
のイメージ)。というか Microservices freak は同じスレッドで async して初めての suspending function の呼び出しで分離して欲しいと思っているはず
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
You can get full code here
It produces something like this:
The answer is 42
Completed in 1017 ms
二倍早くなった!何故なら二つの coroutine を並列実行するようにしたからだ。
覚えておいてほしい: coroutine を使った 平行性は常に明示的に(この場合 async
を使って)行う。
Lazily started async
async
には遅延実行の機能もある。start
オプションに CoroutineStart.LAZY
を渡す.
この場合 coroutine の実行は、値が確実に必要と解る await
が呼ばれたタイミングか start
関数が呼ばれたタイミングまで遅延される:
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
// some computation
one.start() // start the first one
two.start() // start the second one
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
You can get full code here
It produces something like this:
The answer is 42
Completed in 1017 ms
この例では、2つのコルーチンが定義されるが、直ちには実行されず、プログラマーにそれらの実行をいつ始めれば良いかが委ねられる。
ノート: 我々が await
を println の中で呼んだタイミングでまだ start
が呼ばれていなくても,
await
が呼ばれたタイミングで計算が順次実行されるが、これは明らかに LAZY のユースケースではないだろう。
async(start = CoroutineStart.LAZY)
のユースケースは標準的な遅延関数の置き換えで、計算に suspending function が必要となる場合に利用される。
Async-style functions
async
coroutine builder を使えば、
doSomethingUsefulOne
や doSomethingUsefulTwo
を非同期に呼び出す async style function を定義することができる。
このような関数に Async
接尾辞を付けるのは良いスタイルだ。
"Async" 接尾辞は、その関数が非同期計算を開始し、deferred で結果を返すことを解りやすくする。
// The result type of somethingUsefulOneAsync is Deferred<Int>
fun somethingUsefulOneAsync() = async {
doSomethingUsefulOne()
}
// The result type of somethingUsefulTwoAsync is Deferred<Int>
fun somethingUsefulTwoAsync() = async {
doSomethingUsefulTwo()
}
ノート:これらの xxxAsync
関数は suspending functions ではない. これらの関数はどこからでも利用できる。
ただし、返値は常に非同期で(ここでは 並列性を持って)実行される事を意味している。
それらの関数を coroutine の外で利用する例を示す。呼び出しは外で行うが、.await()
は runBlocking {...}
内で呼び出す必要がある:
// note, that we don't have `runBlocking` to the right of `main` in this example
fun main(args: Array<String>) {
val time = measureTimeMillis {
// we can initiate async actions outside of a coroutine
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
// but waiting for a result must involve either suspending or blocking.
// here we use `runBlocking { ... }` to block the main thread while waiting for the result
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}
You can get full code here
Coroutine context and dispatchers
coroutine context (coroutine 文脈)とディスパッティについて
coroutine は、常に、CoroutineContext
型で表現される context 上で実行される。CoroutineContext は Kotlin 標準ライブラリの一部だ。
Coroutine context はいくつかの変数の組み合わせである。特に重要な要素は以前見たコルーチンの Jobと、この章で述べるその dispatcher だ。
Dispatchers and threads
ディスパッチャーとスレッド
Coroutine コンテキストは coroutine dispatcher を含んでいる。dispatcher によってどの thread(s) 上で coroutine を実行すれば良いのかを決定している。Coroutine dispatcher は coroutine が特定のスレッド、スレッドプールまたは confined の場所のどこで実行されるかを制御できる。
全ての coroutines builders 、例えば launch や async は任意引数として
CoroutineContext
パラメータを取る。CoroutineContext は明示的に dispatcher を指定する時に使える。
次の例を実行して欲しい:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
(個人メモ、coroutineContext でこうなるのはへぇという感じ)
You can get full code here
It produces the following output (maybe in different order):
'Unconfined': I'm working in thread main
'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
'newSTC': I'm working in thread MyOwnThread
'coroutineContext': I'm working in thread main
以前のセクションで利用していた default dispatcher は DefaultDispatcher で、これは現在の実装では CommonPool と等しい。つまに launch { ... }
って書くのは launch(DefaultDispatcher) { ... }
と、つまり launch(CommonPool) { ... }
と同じ.
parent
coroutineContext と
Unconfined context の違いはこの後スグ出てくる。
Note: newSingleThreadContext は新しいスレッドを作り、これはすごいリソースを消費する。
実際のリアルアプリケーションでは、これは不要になったら close
関数をつかってリリースされるか、top-level の変数で引き回して再利用する必要がある。(訳注:つまり、ループや処理のために new Thread するのは止めよう!)
Unconfined vs confined dispatcher
Unconfired v.s. confined dispatcher
Unconfined coroutine dispatcher は coroutine を caller thread で始める。ただし、最初の suspension point までだ。Suspension が終わると、呼び出した suspending function によって resume が実行されるスレッドが決まる。Unconfined
dispatcher は、coroutine が CPU time を消費せず、UI のように特殊なスレッドに束縛されている共有データをアップデートしないときにに適切な dispatcher だ。
一方で、coroutineContext がある。これはどんな coroutine でも利用可能なプロパティで、
この特定の coroutine context の参照である。
この方法で、parent context を継承することができる。runBlocking
coroutine のデフォルトの dispatcherは、特に、呼び出しスレッドに縛り付けられ、これを継承することで実行を予測可能な FIFO 順序でこのスレッド上で行う効果がある。
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
delay(500)
println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
You can get full code here
Produces the output:
'Unconfined': I'm working in thread main
'coroutineContext': I'm working in thread main
'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
'coroutineContext': After delay in thread main
runBlocking {...}
の coroutineContext
を継承した coroutine は main
thread で実行を継続して(ログの4行目)、
一方で unconfined は delay
関数が使っている default executor 上で処理を継続している。
Debugging coroutines and threads
coroutine と thread のデバッグ
Coroutines は特定の thread で動き、Unconfined
ディスパッチャや multi-thread dispatcher であれば他の thread 上でresumeする。single-thread dispatcher のときでも、coroutine が実行中かどうかを確認することは難しい。アプリケーションをデバッグする共通のアプローチとして、thread name をログファイルの書く行に出力しておく、というのがある。この機能は広くログフレームワークでサポートされている。coroutine を使う時には、thread 名だけでは多くを教えてくれないこともあり、 kotlinx.coroutines
はこれらを簡単にする仕組みを提供している。
以下のコードを -Dkotlinx.coroutines.debug
JVM option と共に実行してみて欲しい:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking<Unit> {
val a = async(coroutineContext) {
log("I'm computing a piece of the answer")
6
}
val b = async(coroutineContext) {
log("I'm computing another piece of the answer")
7
}
log("The answer is ${a.await() * b.await()}")
}
You can get full code here
このコードには 3 つのコルーチンがある。main coroutine (#1) -- runBlocking
が作ると async
が作った a
, b
である。全てのコルーチンは runBlocking
context の中で実行されて main thread に束縛されている。出力コードは以下の用になる。
[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42
log
関数はスレッドネームを出力するが、デバッグフラグが付いている時にはそこに実行中の coroutine が追記されている。
他の debugging facilities は newCoroutineContext のドキュメントの中に記述がある。
Jumping between threads
thread 間のジャンプ
以下のコードを -Dkotlinx.coroutines.debug
JVM option 付きで実行してみて欲しい:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}
You can get full code here
このコードには、まだ紹介していない新しい機能が入っている。
- runBlocking を特定の context を明示して使っている
- withContext を特定の context に切り替えるために使っている。同じ coroutine にいるままで
[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1
ノート:これは Kotlin 標準ライブラリに含まれている use
関数の説明にもなっている。この関数は newSingleThreadContext で作られたスレッドが不要になったタイミングで削除している。
Job in the context
context 中の Job
Coroutine の Job は context の一部である。使用されている coroutine から、context を取得する事ができる。これには
coroutineContext[Job]
を使う:
fun main(args: Array<String>) = runBlocking<Unit> {
println("My job is ${coroutineContext[Job]}")
}
You can get full code here
It produces something like that when running in debug mode:
My job is "coroutine#1":BlockingCoroutine{Active}@6d311334
CoroutineScope の isActive は次のショートカットだ
coroutineContext[Job]?.isActive == true
.
Children of a coroutine
coroutine の子ども
coroutine の coroutineContext
が他の coroutine を起動するのに使われると、新しい coroutine の Job
は親の coroutine Job の 子ども になる。
親の coroutine が cancel されると、その子の coroutine も再帰的に cancel される。
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
// it spawns two other jobs, one with its separate context
val job1 = launch {
println("job1: I have my own context and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
}
// and the other inherits the parent context
val job2 = launch(coroutineContext) {
delay(100)
println("job2: I am a child of the request coroutine")
delay(1000)
println("job2: I will not execute this line if my parent request is cancelled")
}
// request completes when both its sub-jobs complete:
job1.join()
job2.join()
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}
You can get full code here
The output of this code is:
job1: I have my own context and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?
つまり launch(coroutineContext)
をしている job2
は cancel propagation された。
Combining contexts
context を結合する
Coroutine contexts は +
operator を使うことで結合できる。右側の context が対応する右側 context の entry を置き換える。例えば、親 coroutine の Job
を継承しながら、その dispatcher を置換するには:
fun main(args: Array<String>) = runBlocking<Unit> {
// start a coroutine to process some kind of incoming request
val request = launch(coroutineContext) { // use the context of `runBlocking`
// spawns CPU-intensive child job in CommonPool !!!
val job = launch(coroutineContext + CommonPool) {
println("job: I am a child of the request coroutine, but with a different dispatcher")
delay(1000)
println("job: I will not execute this line if my parent request is cancelled")
}
job.join() // request completes when its sub-job completes
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}
You can get full code here
The expected outcome of this code is:
job: I am a child of the request coroutine, but with a different dispatcher
main: Who has survived request cancellation?
Parental responsibilities
保護者の責任
親 coroutine は常に子 coroutine の実行を待つ。親は明示的に子をトラックする必要は無く、待つために join
を使う必要も無い:
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
repeat(3) { i -> // launch a few children jobs
launch(coroutineContext) {
delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
println("Coroutine $i is done")
}
}
println("request: I'm done and I don't explicitly join my children that are still active")
}
request.join() // wait for completion of the request, including all its children
println("Now processing of the request is complete")
}
You can get full code here
The result is going to be:
request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete
Naming coroutines for debugging
デバッグのための名前付けされた coroutine
自動的に割り当てられた id は coroutine のログが頻繁に存在して、単に同じ coroutine のログをまとめて見たいときには十分だが。ただし、 coroutine が特定のリクエストの処理に縛られたり特定のバックグラウンドタスクを実行しているときには、デバッグのために明示的な名前を付けたほうがいいだろう。CoroutineName
context element は thread name のように名前を保存する。またそれはdebugging mode が on のときには thread name の一部として表示される。
次のコードがこのコンセプトだ:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
log("Started main coroutine")
// run two background value computations
val v1 = async(CoroutineName("v1coroutine")) {
delay(500)
log("Computing v1")
252
}
val v2 = async(CoroutineName("v2coroutine")) {
delay(1000)
log("Computing v2")
6
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}
You can get full code here
The output it produces with -Dkotlinx.coroutines.debug
JVM option is similar to:
[main @main#1] Started main coroutine
[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42
Cancellation via explicit job
明示的な job を経由したキャンセル
context, 子 そして jobs を一緒に使うための、我々の knowledge を書かせて欲しい。
我々のアプリケーションはライフサイクルのあるオブジェクトを持っており、そのオブジェクトは coroutine ではない。例えば、Android application を書いている時は様々な coroutine を Android activity の中で起動し、それはデータを fetch / upload, animation するような非同期処理を実行する。このような coroutine はどれも activiety が destoroy されたらメモリーリークを避ける為に cancel されなければならない。
我々は coroutine のライフサイクルを Job
型のインスタンスを作成して activity のライフサイクルと紐付けておくことで事で管理することができる。Job()
factory function を使うことで作られた job のインスタンスを次に示す。簡略化するために、launch(coroutineContext + job)
を使うのではなく launch(coroutineContext, parent = job)
明示的に parent job が使われることを記述出来る。
今、単一の Job.cancel
は全ての起動された子を cancel する。更に、Job.join
は全てが完了するのを待ち、同じく cancelAndJoin
も使うことができる:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = Job() // create a job object to manage our lifecycle
// now launch ten coroutines for a demo, each working for a different time
val coroutines = List(10) { i ->
// they are all children of our job object
launch(coroutineContext, parent = job) { // we use the context of main runBlocking thread, but with our parent job
delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
println("Coroutine $i is done")
}
}
println("Launched ${coroutines.size} coroutines")
delay(500L) // delay for half a second
println("Cancelling the job!")
job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
}
You can get full code here
The output of this example is:
Launched 10 coroutines
Coroutine 0 is done
Coroutine 1 is done
Cancelling the job!
最初の3つのコルーチンが print されて残りは job.cancelAndJoin()
でキャンセルされた。
つまり必要なのは Activity が作られたときに Job を作成して、coroutine を作るときにその子として作り、activity が destoroy されるときに Job を cancel することってことになる。Android lifecycle の場合、子 coroutine は非同期なので join することはできない。しかし join できるということはバックエンドサービスでは、紐付けられたリソースの利用を確かにするという意味で有用だ。
Channels
チャネル
Deferred な値は単一の値を coroutine 間で簡単に引き渡す方法を提供した。
チャネルは、複数の値をストリームで転送する方法を提供する。
Channel basics
Channel
はコンセプトとしては BlockingQueue
に非常に似ている。最大の違いは、pub
オペレーションが blocking なのに対して、send
オペレーションは suspending だってことで、同じく take
オペレーションは blocking だが、receive
は suspending だ。
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
}
You can get full code here
The output of this code is:
1
4
9
16
25
Done!
Closing and iteration over channels
クローズと channel のイテレーション
quque と違って、channel はもうエレメントが到達しない事を示すために close することができる。
受信側では、通常の for
ループを使って要素を受け取るのが便利だ。
概念的に、close
は close ようの特別なトークンを Channel に送信しているのようなものだ。
特殊トークンが受信されると、イテレーションはそこで止まり、同時に特殊トークン以前の全てのトークンが受信されていることが保証される:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
}
You can get full code here
Building channel producers
Channel 生成源を作成する
element のシーケンスを生成する coroutine を作るパターンはほとんど同じ。
producer-consumer パターンの一部で、平行コードで頻繁に見ることができる。
たぶんこのように抽象化できる:channel をパラメーターとして取る関数関数があって、通常関数が結果を返さないと行けないのと逆で、関数の中で値が生成される。
このパターンに対応するため produce
という名前の coroutine builder があり、producer の右にラムダ式を書くだけで簡単に channel を作成する事ができる。そして channel の拡張関数である consumeEach
は consumer side の for loop を置き換えてくれる。
fun produceSquares() = produce<Int> {
for (x in 1..5) send(x * x)
}
fun main(args: Array<String>) = runBlocking<Unit> {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
You can get full code here
Pipelines
パイプライン
パイプラインは、一つの coroutine が、潜在的に無限のデータストリームを生成するパターンである。
fun produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
他の coroutine または coroutines が stream を consuming し、プロセッシングして、必要に応じて他の値を生成したりする。以下にいくつかの例を示す:
fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
for (x in numbers) send(x * x)
}
The main code starts and connects the whole pipeline:
fun main(args: Array<String>) = runBlocking<Unit> {
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
for (i in 1..5) println(squares.receive()) // print first five
println("Done!") // we are done
squares.cancel() // need to cancel these coroutines in a larger app
numbers.cancel()
}
You can get full code here
coroutine を close するのは、exmaple app のような小さなアプリケーションでは必須では無い。
なぜなら
coroutines are like daemon threads なので、終了時に同時に終了するから。
だが、大きなアプリケーションでは、不要なパイプラインは停止しておく必要がある。
その代わりに、pipeline coroutine を次の例で示すような
children of a main coroutine とする方法もある.
Prime numbers with pipeline
pipeline を使った素数の列
coroutine の pipeline を使った素数を生成する例を使って、パイプラインの極端な例を見てみましょう。
まずは無限の sequence of numbers から初める。ここでは、caller が coroutine がどこで実行されるかを選択するための、明示的な context
パラメーターを produce builder に渡す方法を使う:
fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
次の pipeline ステージでは、与えられた素数で割りきれる数字を filter する。
fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
for (x in numbers) if (x % prime != 0) send(x)
}
さて、prime number stream を作成するために、2 から初めて、prime number を取得し、取得した prime number を使って新しい stream を作って行くことができる:
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
以下の例は最初の 10 個の prime number を表示する。
全ての coroutine は main thread の context で実行される。全ての coroutine は main runBlocking coroutine であり、そのcoroutineContext の子として起動されるため、明示的な全ての coroutine リストを持たなくても、 cancelChildren 拡張関数を呼び出すことでそれらを cancel することができる。
fun main(args: Array<String>) = runBlocking<Unit> {
var cur = numbersFrom(coroutineContext, 2)
for (i in 1..10) {
val prime = cur.receive()
println(prime)
cur = filter(coroutineContext, cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
You can get full code here
The output of this code is:
2
3
5
7
11
13
17
19
23
29
ノート: 同じパイプラインを標準ライブラリの
buildIterator
coroutine builder を使って作る事ができる。
produce
を buildIterator
で、send
を yield
で、 receive
を next
で、
ReceiveChannel
を Iterator
で、そしてそのコンテクストをりようしてだ。
この場合 runBlocking
は必要無い。
しかし、上で示したような channel を使った pipeline の利点は、CommonPool
context を使ったら実際は複数コアを使うことができる点だ。
何れの場合も、これは極端にインパクトのある素数列を生成する例だった。
実環境では、パイプラインは他の suspending function を呼んで (remote services への非同期呼び出しなど)、それらのパイプラインは buildSequence
/buildIterator
では構築できないものになるだろう。なぜならそれらは完全に非同期である produce
と違ってsuspension を許していないからだ。
(個人メモ:なぜ?)
Fan-out
ファンアウト
複数の coroutine は同じ channel からイベントを受信するかもしれない。
まずは、1秒で10個の整数を定期的に生成する producer coroutine から始めよう:
fun produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
そして、複数の processor coroutine (= consumer coroutine) も持つことができる。この例では単に coroutine id と受信した整数を println するだけだ:
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
では、5個の processor を約 1 秒間稼働させよう。何が起こるか見てみよう:
fun main(args: Array<String>) = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
You can get full code here
結果は次の用になるはずだ。それぞれの整数を受信した processor id は違うだろうが。。。:
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
ノート: producer coroutine が channel を close すると、他の processor coroutines による channel のイテレーションも順次終了する。
また、launchProcessor
の中で、どのように明示的に channel をイテレートしているかにも注意して欲しい。consumeEach
と違って、for
loop pattern は複数の coroutines に対して完全に安全である。もし一つの coroutines が異常終了しても、他が channel の processing を継続できるが、consumeEach
によって書かれた processor はその正常または異常終了時に、利用している channel を消費(cancel)してしまう。
Fan-in
ファンイン
複数の coroutines が同じチャネルに書き込んでもよい。たとえば、String の channel があって、suspending function が指定されたインターバルでもって定期的に同じチャネルを書き込むような場合:
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
さあ、string を送り込む複数の channel を起動して何が起こるかみてみよう。
(この例では、main thread の子どもになるように main thread の corotuineContext で coroutine を起動している)
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<String>()
launch(coroutineContext) { sendString(channel, "foo", 200L) }
launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
You can get full code here
The output is:
foo
foo
BAR!
foo
foo
BAR!
Buffered channels
バッファされたチャネル
今までに見た channel はどれもバッファを持っていなかった。
バッファされていないチャネルは sender と receiver が receiver がお互いに出会った(ランデブー、した)タイミングで要素を転送する。
もし送信が先に行われたとすると、要素が受信されるまで sender は suspend するし、逆に receive が先に行われると、send が呼ばれるまで suspend する。
Channel()
のfactory function と produce
builder はオプションで buffer size を指定する capacity
パラメータを取る。
このバッファーは sender が suspend するまえに複数の要素を送る事を可能にする、BlockingQueue
のようなやつ。
BlockingQueue
で capacity が満タンになると block するのと同じような形で、channel は buffer が満タンになったタイミングで初めて suspend する。
次のコードで挙動を見てみよう:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>(4) // create buffered channel
val sender = launch(coroutineContext) { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
}
You can get full code here
"sending" が 5 回表示されたはずだ。:
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
キャパシティが 4 なので4回の send が成功し、5回目の呼び出しで Block した。(println は send の前にあることに注意)
Ticker channels
ティッカーチャネル(ティッカー = 心拍、あるいはチカチカするもの)
Ticker channel は特別なランデブー channel で、最後に element が consume されてから一定期間毎に Unit
要素を produce するようなものだ。
これだけでは使い道がないような気がするが、windowing や他の時間ベースの処理をするために、複雑な time-based produce
pipeline を構築・運営するために有益である。
Ticker channel は select
の中で、"on tick" 処理を実行するのにも使える。
この処理を実装する為に ticker
factory method が使える。
追加の element が必要無いことを明示的に示すためには、cancel
する必要がある。
それでは、例を見てみよう:
fun main(args: Array<String>) = runBlocking<Unit> {
val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}
You can get full code here
It prints following lines:
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
ノート: デフォルトで ticker
は consumer による pause を明に扱い、要素の間隔を一定に保つためにポーズが発生すると次のelementの発生を早める。
オプションで、mode
パラメーターに TickerMode.FIXED_DELAY
を設定すれば、この挙動を element 間で fixed delay を保つように変更できる。
Channels are fair
Channel は公正(fair share scheduling)です!
channel に対する、複数スレッドからの送受信操作は 公正 である。またそれらは first-in first-out 順序で配信される。
つまり、最初に receive
を実行した coroutine が最初に要素を受け取る。次の例は 2 つの "poig" "pong" の2つの coroutine が
共有の table channel から "ball" を receive する例だ。
data class Ball(var hits: Int)
fun main(args: Array<String>) = runBlocking<Unit> {
val table = Channel<Ball>() // a shared table
launch(coroutineContext) { player("ping", table) }
launch(coroutineContext) { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
You can get full code here
"ping" coroutine が最初に起動されるので、最初に ball を receive するのは "ping" になる。
"ping" coroutine がボールをsend した後に、直ちにレシーブを開始しても、次に ball は "pong" coroutine でレシーブされる。
なぜなら、"ping" coroutine が待つより先に "pong" が待っていたからである。
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
ノート: たまに元になっている executor が実行中の場合に、 channel は unfair に見える挙動をすることもある。
詳細はこの Issue を参照してほしい。
Shared mutable state and concurrency
共有された変更可能な状態と concurrency
Coroutine は CommonPool
のような multi thread dispatcher 上で動く場合、平行実行され得る。
そしてそれは、よくある平行プログラミングの問題を引き起こす。
主な問題は 変更可能な共有状態 への同期化されたアクセスである。
coroutine の世界におけるこの問題に対するいくつかの解決策は、multi-thread の世界における解決策と一緒だ、ただ他の物は独特である:
The problem
問題の定式化
1千 coroutine を起動して、全ての coroutine に 1千回処理をさせることを考える(合計で100万処理だ)
それと同時に、比較の為に実行時間を取得しておこう:
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
val n = 1000 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
val jobs = List(n) {
launch(context) {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
まず始めに、共有の変数を multi-thread CommonPool
context を使って increment していくシンプルな action から始めよう。
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
You can get full code here
最終的に何が見えた? "Counter = 1000000" にはほど遠かっただろう。なぜなら 1000 の coroutine が同期化されることなく counter
を平行でインクリメントしたからである。
ノート: もし CPU 数が2より少ない古いシステムを使っていたら、1000000
が見えたかも知れない、何故なら CommonPool
はこの場合1つのスレッドしか使わないからだ。
この結果を再現するためには、以下のコードを利用する必要がある。:
val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
counter++
}
println("Counter = $counter")
}
You can get full code here
Volatiles are of no help
Volatiles は助けにならない
よくある誤解として volatile
を付けておけば平行性の問題が解決されるというのがある。やってみよう:
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
You can get full code here
コードは遅くなったが、まだ "Counter = 1000000" にはならない。何故なら volatile は対象変数への読み取りと書き取りを
直列化 (または技術的には "atomic") にする事は保証するが、それ以上の大きな動作(increment in our case)は atomic にしてくれないからだ。
Thread-safe data structures
Thread-safe なデータ構造
thread と coroutine で共に動く一般的な解決策は、thread-safe (synchronized, linearizable または atomic な)データ構造を使うことだ。
データ構造が、共有状態上で行われる処理に対応した synchronization を全て実施してくれる。
単純なカウンターであるこの場合では、我々は incrementAndGet
操作がある AtomicInteger
クラスを使える:
var counter = AtomicInteger()
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
}
You can get full code here
これは、この問題に関しては最速の解決策だ。
単純な counter や、collection, queue やその他標準的なデータ構造と基本的な操作に関しては標準のライブラリが解決策になる。
ただし、これはthread-safe な実装が用意されていない、複雑な状態やオペレーションには簡単にはスケールしない。
Thread confinement fine-grained
きめ細やかなスレッド制約
細やかな thread confinement は共有可変変数へのアクセスの問題を、特定のスレッドに限ることで解決する解決策だ。
これは、 UI application でよく使われている。UI state は特定の single event-dispatch/application thread に confined だ。
この方法は、single-threaded context を利用する事で簡単に coroutine にも適用する事ができる:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) { // run each coroutine in CommonPool
withContext(counterContext) { // but confine each increment to the single-threaded context
counter++
}
}
println("Counter = $counter")
}
You can get full code here
むっちゃ遅い! 何故ならこれは きめ細やかな thread-confinement だからだ。インクリメントの度に multi-threaded CommonPool
context から withContext を使った single-threaded context への切り替えが発生する。
Thread confinement coarse-grained
大域的 thread 保証
実際には、thread 保証は大きな固まりで行われることが多い。例えば、状態を変更する大きな business logic を single thread に縛り付けるなど。
以下の例はそのようなことをやっている、それぞれの coroutine を一つの thread context の中で動かす:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(counterContext) { // run each coroutine in the single-threaded context
counter++
}
println("Counter = $counter")
}
You can get full code here
(fine-grained より)もっと早くて、正確な値が取得できるようになった!
Mutual exclusion
排他制御
排他制御による解決は、全ての共有状態への変更を平行して実行されない critical section で保護するやり方だ。
blocking world では、よく synchronized
や ReentrantLock
をこのために使うんじゃないかな。
Coroutine の代用手段は Mutex と呼ばれている。これは lock and unlock 関数を持って critical section を区分することができる。
Mutex.lock()
の最大の違いは、これは suspending function だってことだ。つまり OS thread を block することはない!
withLock 拡張関数が、 mutex.lock(); try { ... } finally { mutex.unlock() }
パターンの簡単な表現なので使うと良い。
val mutex = Mutex()
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
You can get full code here
この例のロックは粒度が細かく、そのために実行時間も長くかかっている。
ただし、君が絶対に共有状態を定期的に更新しなければならなくて、束縛するのに適当なスレッドが無い・作成したく無い場合はこの例はいい選択だ。
Actors
actor は以下の3つの要素からなる entity である
- coroutine
- その coroutine の中に隠蔽されて束縛された状態(state)
- 他の coroutine とコミュニケーションを取るための channel
シンプルな actor は関数として書くことができるが、複雑な状態を持った actor はクラスとして表現するのが妥当だろう。
Kotlin は actor coroutine builder を持っている。
読んで字のごとく、スコープ内にメッセージ受け入れるために actor の mailbox channel をスコープ内に簡単に取り込み、
また送信 channel を結果 job object に結びつけて、handle のように取り回し可能な actor の単一参照を作る
actor を使う最初のステップは、actor が処理しようとしているメッセージをクラスとして定義することだ。
Kotlin の sealed classes はこの目的によく合致する。
ここでは CounterMsg
sealed class を定義して、同時にカウンターをインクリメントする IncCounter
メッセージと
値を取得する為の GetCounter
メッセージを実装する。
後者はレスポンスを送信するために必要な物だ。
CompletableDeferred communication primitive は、受け手で処理したときに単一要素を受信する処理を表現し、
この目的ではレスポンスを actor から送信するために利用される。
// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
これを使って、actor coroutine builder を呼び出す関数を定義しよう
// This function launches a new counter actor
fun counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
main code は直感的に以下の用になる:
fun main(args: Array<String>) = runBlocking<Unit> {
val counter = counterActor() // create the actor
massiveRun(CommonPool) {
counter.send(IncCounter)
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
You can get full code here
actor 自信がどんな context で実行されているのかは正確性にとって問題で無くなる。
Actor は coroutine であって、coroutine は順次実行され、特定のコルーチンへ状態を束縛することは、共有可変状態の問題への解決策として働く。
加えて、actor は彼ら自身の private 状態を変更することもでき、しかしロックを避ける為に他の actor へはメッセージを通じてのみ影響を及ぼせる。
actor はロードがかかっているときには locking よりも効率の良い手段である。なぜならそのような場合では常にやるべき作業があり、
しかし他の context にスイッチする必要が全く無いからだ。
(約:channel に絶え間なく send される状況下では、micro thread の切り替えすら発生しない、と言うようなことを言っている?)
ノート: actor coroutine builder は produce coroutine builder が二重になったものだ。
Actor はメッセージを受信する channel と紐付けられて、producer はメッセージを送信する channel に紐付けられる。
Select expression
Select 式
Select 式は複数の suspending 関数を同時に await することを可能にし、
最初に利用可能となったものを select (選択) する。
Selecting from channels
複数の channel からの select
fizz
buzz
という文字からなる二つの producer があるとしよう。fizz
は 300ms 毎に "Fizz" を produce する:
fun fizz(context: CoroutineContext) = produce<String>(context) {
while (true) { // sends "Fizz" every 300 ms
delay(300)
send("Fizz")
}
}
bazz
は 500ms 毎に "Buzz!" を produce する:
fun buzz(context: CoroutineContext) = produce<String>(context) {
while (true) { // sends "Buzz!" every 500 ms
delay(500)
send("Buzz!")
}
}
receive suspending function を使うと、我々は どちらかの channel からのメッセージの待ち状態へ移行することしか。
しかし select 式では、その onReceive 節を利用する事で、 両方 の待ち状態とすることができる:
suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
select<Unit> { // <Unit> means that this select expression does not produce any result
fizz.onReceive { value -> // this is the first select clause
println("fizz -> '$value'")
}
buzz.onReceive { value -> // this is the second select clause
println("buzz -> '$value'")
}
}
}
全体を 7 回実行してみよう:
fun main(args: Array<String>) = runBlocking<Unit> {
val fizz = fizz(coroutineContext)
val buzz = buzz(coroutineContext)
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
}
You can get full code here
このコードの結果は以下のようになる:
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'
Selecting on close
select
中の onReceive 節は channel が close されていると、select は対応した例外を投げる。
onReceiveOrNull 節を使うと、channel が close されていた場合に特定の挙動をさせることができる。
次の例は、select
が選択した節の結果を返却するような例となっている
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
a.onReceiveOrNull { value ->
if (value == null)
"Channel 'a' is closed"
else
"a -> '$value'"
}
b.onReceiveOrNull { value ->
if (value == null)
"Channel 'b' is closed"
else
"b -> '$value'"
}
}
a
を 4回 "Hello" メッセージを produce する channel で、
b
を 4回 "World" メッセージを produce する channel とする:
fun main(args: Array<String>) = runBlocking<Unit> {
// we are using the context of the main thread in this example for predictability ...
val a = produce<String>(coroutineContext) {
repeat(4) { send("Hello $it") }
}
val b = produce<String>(coroutineContext) {
repeat(4) { send("World $it") }
}
repeat(8) { // print first eight results
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}
You can get full code here
この結果は興味深い物になるので、何が起こるか詳しく分析してみよう:
a -> 'Hello 0'
a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed
何個かここから観測できることがある、
まず select
は最初の節への 偏り があるということだ。同時に複数の channel が選択可能になった場合、最初のものが select される。
ここでは、両方の channel が同時に文字を produce しているので、a
チャネル、つまり select
の最初の節に書かれている、が勝者となる。
しかし、ここでは unbeffered channel を利用しているため、a
が send の呼び出しによって複数回 suspended になり、b
にも送信機会が回ってくることもある。
二つ目に、 onReceiveOrNull は channel が閉じられたあと、直ちに select されるわけでは無いと言うことだ。
Selecting to send
送信のための selecting
Select 式は onSend 節を持っており、selection の偏りのある特性と組み合わせると非常にいい働きをしてくれる。
整数の producer であって、primary channel の consumer が追いつけないときに、値を side
channel に送信するものを考えて見よう:
fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) {
for (num in 1..10) { // produce 10 numbers from 1 to 10
delay(100) // every 100 ms
select<Unit> {
onSend(num) {} // Send to the primary channel
side.onSend(num) {} // or to the side channel
}
}
}
コンシューマは非常に遅く、一つの整数を消費するのに 250ms かかるとする
fun main(args: Array<String>) = runBlocking<Unit> {
val side = Channel<Int>() // allocate side channel
launch(coroutineContext) { // this is a very fast consumer for the side channel
side.consumeEach { println("Side channel has $it") }
}
produceNumbers(coroutineContext, side).consumeEach {
println("Consuming $it")
delay(250) // let us digest the consumed number properly, do not hurry
}
println("Done consuming")
coroutineContext.cancelChildren()
}
You can get full code here
So let us see what happens:
Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming
Selecting deferred values
deferred value を select する
Deferred value は onAwait 節を使って select することができる。
与えられた遅延の後に string を返却するような非同期関数を作り:
fun asyncString(time: Int) = async {
delay(time.toLong())
"Waited for $time ms"
}
ランダムな遅延を引数にその関数を1ダース作る事にする
fun asyncStringsList(): List<Deferred<String>> {
val random = Random(3)
return List(12) { asyncString(random.nextInt(1000)) }
}
そして、main 関数では最初に完了したものを待って、残りで active なままの deferred values の数を表示するようにする。
ここで利用している select
式は Kotlin DSK であるという事実により、
我々は 通常の code を利用して select に節を与えることができる。
この例では、deferred value を含んで居る list 上を走査して、それぞれの deferred value 上で onAwait
節を呼び出している。
fun main(args: Array<String>) = runBlocking<Unit> {
val list = asyncStringsList()
val result = select<String> {
list.withIndex().forEach { (index, deferred) ->
deferred.onAwait { answer ->
"Deferred $index produced answer '$answer'"
}
}
}
println(result)
val countActive = list.count { it.isActive }
println("$countActive coroutines are still active")
}
You can get full code here
出力はこうなる:
Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active
Switch over a channel of deferred values
channel producer function であって、deferred string の channel を消費し、それぞれの deferred が確定するまで待ってそれを
send するものを考える。ただし待ち時間は、次の deferred value が届くまでである。
この例は onReceiveOrNull と onAwait 節を同じ select
中で利用している:
fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
var current = input.receive() // start with first received deferred value
while (isActive) { // loop while not cancelled/closed
val next = select<Deferred<String>?> { // return next deferred value from this select or null
input.onReceiveOrNull { update ->
update // replaces next value to wait
}
current.onAwait { value ->
send(value) // send value that current deferred has produced
input.receiveOrNull() // and use the next deferred from the input channel
}
}
if (next == null) {
println("Channel was closed")
break // out of loop
} else {
current = next
}
}
}
これをテストするために、特定の時間だけ待った後に指定した文字列を resolve する単純な async function を利用する:
fun asyncString(str: String, time: Long) = async {
delay(time)
str
}
main function は switchMapDeferreds
の結果を表示するシンプルな coroutine を起動して、テストデータを送信する::
fun main(args: Array<String>) = runBlocking<Unit> {
val chan = Channel<Deferred<String>>() // the channel for test
launch(coroutineContext) { // launch printing coroutine
for (s in switchMapDeferreds(chan))
println(s) // print each received string
}
chan.send(asyncString("BEGIN", 100))
delay(200) // enough time for "BEGIN" to be produced
chan.send(asyncString("Slow", 500))
delay(100) // not enough time to produce slow
chan.send(asyncString("Replace", 100))
delay(500) // give it time before the last one
chan.send(asyncString("END", 500))
delay(1000) // give it time to process
chan.close() // close the channel ...
delay(500) // and wait some time to let it finish
}
You can get full code here
結果は次のようになる:
BEGIN
Replace
END
Channel was closed