※ソース記事はこちら
コルーチンはDispatchers.Defaultのようなマルチスレッドのディスパッチャーを使うことで、平行に実行することができる。それにより、すべてのいつもの並列処理の問題が引き起こされる。コルーチンの領域でのこの問題に対するいくつかの解法は、マルチスレッドの世界の解法に似ているものもあるが、独特なものもある。
問題
すべてが同じ動作を1000回行う100個のコルーチンを起動してみよう。さらに後で比較するためにその完了までの時間も計測する。
マルチスレッドのDispatchers.Defaultを使い、共有可変な変数を増加させる非常に簡単な動作から始める。
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // それぞれのコルーチンが動作を繰り返す回数
val time = measureTimeMillis {
coroutineScope { // コルーチン群のスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
結果として何が出力されるか? これまで"Counter = 100000"が出力される可能性はかなり低い。なぜなら100個のコルーチンがまったく同期なしで複数のスレッドから並列にcounter
を増加しているためである。
volatileは役に立たない
変数をvolatile
にすることで並列性の問題を解決するというありふれた誤解がある。試してみよう。
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // それぞれのコルーチンが動作を繰り返す回数
val time = measureTimeMillis {
coroutineScope { // コルーチン群のスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
@Volatile // Kotlinでは`volatile`はアノテーションである
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
このコードはより遅く動作するが、結果として "Counter = 100000"はまだ得られない。なぜならvolatile変数は、対応する変更への線形的な(これは"原子的"の専門用語である)読み書きを保証するが、より広い動作(このケースは増加させること)の原子性は提供しないためである。
スレッドセーフなデータ構造
スレッドとコルーチン両方で作用する一般的な解法は、共有状態で実行する必要がある該当の操作のために必要なすべての同期化を提供する、スレッドセーフな(別名synchronizedまたはlinearizableまたはatomicな)データ構造を使うことである。簡単なカウンターの例では、原子的なincrementAndGet
操作を持つ、AtomicInteger
を使うことができる。
import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // それぞれのコルーチンが動作を繰り返す回数
val time = measureTimeMillis {
coroutineScope { // コルーチン群のスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
これがこの特有の問題のための、最速の解法である。これは平凡なカウンター、コレクション、キューとその他の標準データ構造とそれらへの操作用に動作する。しかしこれは、すぐに使えるスレッドセーフな実装を持たない複雑な状態や複雑な操作に簡単には合わない。
粒度の高いスレッド束縛
スレッド束縛は、共有可変状態の問題に対するアプローチであり、そこでは、ある共有状態へのすべてのアクセスは、単一のスレッドに限定される。これは典型的にはUIアプリケーションで使われ、そこではすべてのUI状態は、単一のイベントディスパッチャー/アプリケーションスレッドに制限される。シングルスレッドコンテキストを使うことにより、コルーチンに適用することが容易である。
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // それぞれのコルーチンが動作を繰り返す回数
val time = measureTimeMillis {
coroutineScope { // コルーチン群のスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// それぞれの増加をシングルスレッドコンテキストに限定する
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
このコードは非常に遅い。なぜなら、粒度の高いスレッド束縛だからである。それぞれの個々の増加は、マルチスレッドのDispatchers.Defaultコンテキストから、withContext(counterContext)を使って、シングルスレッドのコンテキストに切り替えている。
粒度の低いスレッド束縛
実際は、スレッド束縛は大きな塊で実行される。例えば大きな、状態を更新するビジネスロジックがシングルスレッドに制限されるなどである。次の例では、そのように行い、最初からシングルスレッドのコンテキストでそれぞれのコルーチンを実行する。
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // それぞれのコルーチンが動作を繰り返す回数
val time = measureTimeMillis {
coroutineScope { // コルーチン群のスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// そべてをシングルスレッドコンテキストに制限する
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
これはかなり早く動作し、正しい結果を出力する。
相互排他
問題に対する相互排他の解法は、共有状態のすべての変更者をクリティカルセクションで保護し、決して並列に実行されないようにする。ブロッキングの世界では、それに対してよく、synchronized
やReentrantLock
が使われる。コルーチンの選択肢は、Mutexと呼ばれる。それはクリティカルセクションを区切るため、lockとunlock関数を持つ。重要な違いは、Mutex.lock()
は、suspend関数だということである。それはスレッドをブロックしない。
mutex.lock(); try { ... } finally { mutex.unlock() }
パターンを便利に表すwithLock拡張関数もある。
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // それぞれのコルーチンが動作を繰り返す回数
val time = measureTimeMillis {
coroutineScope { // コルーチン群のスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// lockでそれぞれの増加を保護する
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
この例のロック処理は、粒度が低いため、代償を払う。しかし、定期的に絶対、ある共有状態を変更しなければならないが、状態を制限するための通常のスレッドが無い、いくつかの状況のためには良い選択肢である。
アクター
アクターは、コルーチンと、そのコルーチンに制限され、カプセル化された状態と、他のコルーチンと通信するチャネルの組み合わせから成り立つエンティティである。簡単なアクターは関数として書くことができるが、複雑な状態を持つアクターは、クラスがより適している。
actorがあり、それにより、便利にアクターのメールボックスチャネルをメッセージを受け取るためのスコープに結合し、送信チャネルを結果のジョブオブジェクトに結合する。そのため、単一のactorへの参照を取っ手として持ち運ぶことができる。
アクターを使う最初のステップは、アクターが処理する予定のメッセージのクラスを定義することである。Kotlinのsealedクラスは、その目的にうまく適している。ここではカウンターを増加させるIncCounter
メッセージと、その値を取得するGetCounter
メッセージを持つCounterMsg
sealdクラスを定義する。後者は応答を送信する必要がある。 CompletableDeferred通信プリミティブは。将来知られる(伝達される)単一の値を表し、その目的のためにここで使う。
// counterActorのためのメッセージ型
sealed class CounterMsg
object IncCounter : CounterMsg() // カウンタを増加させる一方向のメッセージ
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 返答つきの要求
その後、actorコルーチンビルダーを使い、アクターを起動する関数を定義する。
// この関数は新しいcounterActorを起動する
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor状態
for (msg in channel) { // 受信したメッセージをイテレーションする
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
mainコードはまっすぐである。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // それぞれのコルーチンが動作を繰り返す回数
val time = measureTimeMillis {
coroutineScope { // コルーチン群のスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
// counterActorのためのメッセージ型
sealed class CounterMsg
object IncCounter : CounterMsg() // カウンタを増加させる一方向のメッセージ
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 返答つきの要求
// この関数は新しいcounterActorを起動する
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor状態
for (msg in channel) { // 受信したメッセージをイテレーションする
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
fun main() = runBlocking<Unit> {
val counter = counterActor() // アクターを作る
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// アクターからカウンター値を取得sるうためのメッセージを送る
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // アクターをシャットダウンする
}
アクター自体がどのコンテキストで実行されるかは、(正しさのために)重要ではない。アクターはコルーチンであり、コルーチンはシーケンシャルに実行される。そのため特定のコルーチンに対して状態を束縛することで、共有可変状態の問題の解法として機能する。実際、アクターは自分のprivateな状態を変更するかもしれないが、(ロックの必要を避けるため)メッセージを通じてお互いに影響を与えることしかできない。
アクターは負荷がかかったときに、ロックよりも効果的である。なぜなら、その場合すべき仕事が常にあり、異なるコンテキストへの切り替えが全く不要だからである。