私はAndroidアプリエンジニアですが、アプリの開発では排他処理を自前で実装しなければならない状況は少ないように思います。ただ、重要な概念であるため振り返りのためにもまとめておきます。
排他制御とは
そもそも排他制御とは何で、なぜ必要なのかにさらっと触れておきます。
通常プログラミングコードでは特定のコンテキストで順次実行する処理を記述します。
シングルスレッドな環境であれば何も気にする必要はないのですが、マルチスレッド環境では同時に別の処理が実行されているため、シングルスレッドでは発生し得ない問題に対処する必要があります。
例えば、特定の変数をインクリメントするという非常にシンプルな処理であっても
hoge++
- 値を参照
- 1を加算
- 変数に代入
のような複数ステップの命令のまとまりであり、これがほぼ同時に2つのスレッドが実行した場合
- (スレッドA) 値を参照
- (スレッドB) 値を参照
- (スレッドB) 1を加算
- (スレッドB) 変数に代入
- (スレッドA) 1を加算
- (スレッドA) 変数に代入
のように、あるスレッドが値を参照してから代入が完了するまでの間に、別のスレッドによって値が読み取られる場合があります。上記の場合、2回実行されたにもかかわらず、1しかインクリメントされないという想定と異なる動作をしてしまいます。
他の処理が入り込む余地のないワンステップの処理(不可分な処理とかアトミックな処理と表現されることがあります)でない限り、こういった可能性を考える必要があります。不可分な処理とは何か、は環境によっても異なり、例えば値の代入であっても不可分とは限らない場合もあります。
ConcurrentModificationException
については遭遇したことがあるという人も多いでしょう。これはコレクションのイテレート中に内容が書き換えられた場合などで発生する例外です。コレクションの処理中にその内容が書き換えられてしまうと意図した結果にならないため例外が発生するようになっています。
このように、一連の処理中に他のスレッドによる操作が行われると破綻する可能性のある部分(クリティカルセクション)を保護するため、他のスレッドによる実行を制限する制御が排他制御です。
Object と synchronized
排他処理のコードがないと説明しにくいので、先にそちらの紹介をします。
一番シンプルな排他制御の方法は synchronized を使った方法です。
Javaでは synchronized メソッドとして実装していたものです。
synchronized void foo() {
// do something
}
Kotlinにはメソッドに対する修飾子としてsynchronizedはありませんが、代わりにアノテーションをつけることで同等の表現が可能です。
@Synchronized
fun foo() {
// do something
}
また、メソッドの中でsynchronizedセクションを作ることもできます。
Javaでは以下のように記述します。
void foo() {
synchronized (this) {
// do something
}
}
Kotlinでは以下のように記述します。
fun foo() {
synchronized(this) {
// do something
}
}
Javaでは構文であり、Kotlinではメソッドであるという違いはありますが、ほぼ同じですね。
synchronizedメソッドやアノテーションとは違って、引数を渡す必要があります。
排他するとはどういうことか?
synchronizedメソッドやsynchronizedセクションの内側が排他されます。
さて、ここまでぼかしていましたが、排他するとはどういうことでしょうか?
簡単に言うと、リソースに所有権を設定し、そのリソースの所有権を獲得したコンテキストだけが、そのリソースにアクセスできるように制限する制御のことです。
synchronizedの仕組みでは、コンテキストとはスレッドであり、対象となるリソースの種別を表現するために利用されるのが、synchronizedセクションで引数として渡しているオブジェクトです。synchronizedメソッドの場合はそのメソッドが所属するインスタンス(this)です。
Javaでは任意クラスのインスタンスを排他するリソースを表現する同期オブジェクトとして扱うことができます。
synchronizedメソッド/セクションは、そのリソースへの所有権獲得した一つのスレッドだけが実行できます。
実行時に、所有権を持っていなければ獲得しようとし、すでに別のスレッドが所有権を持っている場合は、所有権が開放され、自身が獲得できるまで実行が停止します。
用語紹介
クリティカルセクションを守るアクセス権制御機構を ロック(Lock) や ミューテックス(Mutex) と呼びます。また、リソースへのアクセスを管理する機構として セマフォ(Semaphore) というものもあり、こちらは内部カウンタの数だけリソースへのアクセスを許可するものです。カウンタが1のセマフォは バイナリセマフォ(Binary Semaphore) とも呼ばれ、ロックやミューテックスと同等の機能になります。
この排他制御の仕組みは複雑なので理解が曖昧な人も多いでしょう。いくつか例題を作ってみました。
例題1
以下のようなsynchronizedメソッドがあったとして、
class Foo {
@Synchronized
fun foo() {
// do something
}
}
Fooのインスタンスを2つ作って2つのスレッドからコールしたとき、この2つは排他されるでしょうか?
val foo1 = Foo()
val foo2 = Foo()
thread { foo1.foo() }
thread { foo2.foo() }
答え:排他されない
synchronizedメソッドの同期オブジェクトはそのインスタンスになりますので、この2つは排他されません。同時実行される可能性があります。
例題2
synchronizedメソッドを2つ持つクラスを考えます。
class Foo {
@Synchronized
fun foo1() {
// do something
}
@Synchronized
fun foo2() {
// do something
}
}
この2つのメソッドを2つのスレッドからコールしたとき、この2つは排他されるでしょうか?
val foo = Foo()
thread { foo.foo1() }
thread { foo.foo2() }
答え:排他される
メソッドは異なりますが、同期オブジェクトは共通ですので、排他されます。foo1の処理を抜けるまで、foo2が実行されることはありません。
例題3
以下のように、synchronizedメソッドからsynchronizedメソッドを呼び出そうとした場合、どうなるでしょうか?
そのまま実行できるでしょうか?それとも、獲得済みのリソースを再獲得しようとして停止するでしょうか?
@Synchronized
fun foo1() {
foo2()
}
@Synchronized
fun foo2() {
// do something
}
答え:実行される
synchronizedによる排他は再入可能(Reentrant)であるため実行されます。
再入可能(Reentrant)
synchronizedによる排他は再入可能(Reentrant)です。
再入可能とは、所有権を獲得した状態であれば、そのまま同一の所有権が必要なセクションを呼び出すことが可能であるということです。そのため同一オブジェクトが対象であれば、synchronizedメソッドからsynchronizedメソッドを呼び出しても問題無く動作します。
注意してほしいのは、排他の仕組みすべてが再入可能というわけではない点です。あくまでJava/Kotlinのsynchronizedでは再入可能というだけです。
同様の使い方をした場合に、再度そのリソースを取りにいき、自分が持っているため取得できず、開放されるまで実行が停止するが、自分が持っているため解放されることはない。と、デッドロック状態に陥るものもあります。
再入可能であればデッドロックは発生しないという意味ではありません
以下の例で、スレッドAがfoo1をスレッドBがfoo2を同時に実行した場合、スレッドAがlock1を獲得、スレッドBがlock2を獲得し、スレッドAはlock1を持ったまま、lock2が開放されるのを待ち、スレッドBはlock2を持ったまま、lock1が開放されるのを待つことになります。これはいつまでも開放されないデッドロックとなります。
fun foo1() {
synchronized (lock1) {
// do something
synchronized (lock2) {
// do something
}
}
}
fun foo2() {
synchronized (lock2) {
// do something
synchronized (lock1) {
// do something
}
}
}
条件待ちと再開通知
排他機構と切り離せない、スレッド間の調停機能として、条件待ちと再開通知があります。
例えばメッセージキューのようなものを考えた場合、キューへの書き込み、キューからの読み出しは排他が必要です。加えて、キューが空の場合に失敗させるのでは無く、データが書き込まれるまで待たせたい、というケースがあります。これは前述のsychronizedの仕組みだけでは実現できません。
JavaのObject型にはこのためのメソッドが用意されています。Kotlinでは任意のクラスはObject型のサブクラスではなくなっていますが、任意のインスタンスをObject型にキャストすることで呼び出すことができますし、同期用にObject型のインスタンスを別途用意しても良いでしょう。
メッセージキューが必要であれば既存のクラスを使うべきですが、題材として簡単に実装してみます。
class MessageQueue {
private val lock = Object()
private val deque = ArrayDeque<String>()
fun receive(): String {
synchronized(lock) {
while (deque.isEmpty()) {
lock.wait()
}
return deque.removeFirst()
}
}
fun post(message: String) {
synchronized(lock) {
deque.addLast(message)
lock.notify()
}
}
}
この例では、別途Objectのインスタンスを作成しています。
receive()
では、synchronizedで排他しておき、データがない場合は通知されるのを待つために、wait()
をコールしています。wait()
をコールするとスレッドは待ち状態となると同時にリソースの所有権を手放します。
post()
では、synchronizedで排他しておき、データを追加します。その後、notify()
をコールして、wait()
で待っているスレッドがいれば通知して再開させます。
notify()
は待ち状態のスレッド一つを再開させます。notify()
を受けるとwait()
で待っていたスレッドは、再度所有権を獲得した状態で処理が再開され、データの有無を確認した後、また空であればもう一度wait()
、空でなければ最初のデータを読み出す。という動作を行います。
notify()
は必ずしも最初に待っていたスレッドが再開されるわけではなく、任意のスレッドとなります。他にnotifyAll()
という待ち状態のスレッドすべてを再開させるメソッドもあります。用途に応じて使い分けます。
例からも分かるように、wait()
/notify()
は所有権を取得した状態で実行する必要があります。
所有権が必要である理由は、所有権の開放と不可分に実行する必要があるためです。
不可分に実行できない場合、例えば、所有権開放と待ちの間に条件が成立し、条件が成立しているにもかかわらず条件待ちを続けてしまう可能性がでてきてしまいます。
用語紹介
排他アクセス制御機構と条件待ち/通知に使用される条件変数機能を併せ持つものをモニタと呼ぶこともあります。Java/Kotlinでの任意のインスタンスはモニタとしての機能を持っています。
同期オブジェクトとしてthisを使うべきか、別途Objectを用意すべきか
synchronizedメソッドはthis
を対象に排他を行いますが、前項の例のように別途Objectを用意して利用することもできます。
this
を対象にした場合、そのインスタンスを利用しているコードでも同一リソースに対する排他処理を実装できるということが重要です。例えばCollections
のSynchronizedCollection
ではthis
を対象に各メソッドがsynchronized指定されています。この場合、set単体で利用する場合はそのままつかう一方、一連のまとまった処理の場合は、それらをこのcollectionのインスタンスを対象にsynchronizedするという使い方ができます。
以下の例の場合、add
はsynchronized
で囲んでいませんが、forEach
と排他されます。
val list = Collections.synchronizedList(mutableListOf<String>())
list.add("1")
synchronized(list) {
list.forEach {
// do something
}
}
一方で、排他処理を外部に影響を受けず実装したい場合は、別途Objectを用意し、外部から見えないようにするのがよいです。
Kotlinの場合はwait()
/notify()
メソッドへのアクセスを用意するため、this
を使う場合もキャストしたものを保持させるのが良いかもしれません。
private val lock: Object = this as Object
ReentrantLock
シンプルな排他処理はsynchronizedで実装することができますが、より高度な制御が必要な場合は、java.util.concurrent
パッケージにあるReentrantLock
を使うことができます。
synchronized による排他よりはこちらの方を使うべきなのか?というのはよくある議題ですが、synchronized では機能が不足している場合や、極めて多数のスレッドを扱うなど、性能がシビアに求められる場面でなければ synchronized で良いでしょう。
基本的な使い方は以下のようになります。
val lock = ReentrantLock()
lock.lock()
try {
// do something
} finally {
lock.unlock()
}
synchronizedと異なり、lock()
/unlock()
を明示的に呼び出します。
Lockの開放条件が複雑な場合でも実装しやすい一方、開放漏れがあると致命的な不具合となってしまうため、Javaではtry〜finallyを使って確実に開放されるよう実装するのが通例でした。KotlinではwithLock
という拡張関数が用意されています。これを使うことで、ボイラープレートコードを減らし抜け漏れを防止することができます。
lock.withLock {
// do something
}
ReentrantLockには誰が所有権を持っているかなど、状態を確認するメソッドや、tryLock()
という所有権を獲得できない場合に停止するのでは無く、失敗で抜けるメソッドなど、より細やかに同期制御するための機能が豊富に用意されています。
条件待ちと再開通知
ReentrantLock
で条件待ちを行う場合、newCondition()
で条件変数を作成しこれを利用します。synchronizedの仕組みでは同期オブジェクトとしてのインスタンスに直接メソッドがあったため、条件は一つしか使えませんでしたが、ReentrantLock
では複数の条件変数を扱うことができます。
例えば、synchronziedで説明したメッセージキューですが、キューの容量が決められている場合、送信時にも容量の上限に達している場合は、空きができるまで待機させることもしたくなります。この場合、待つ条件が空の場合とは異なるため、別の条件変数が必要になります。
class MessageQueue {
private val lock = ReentrantLock()
private val emptyCondition = lock.newCondition()
private val fullCondition = lock.newCondition()
private val deque = ArrayDeque<String>()
private val maxsize = 10
fun receive(): String {
lock.withLock {
while (deque.isEmpty()) {
emptyCondition.await()
}
val result = deque.removeFirst()
fullCondition.signal()
return result
}
}
fun post(message: String) {
lock.withLock {
while (deque.size >= maxsize) {
fullCondition.await()
}
deque.addLast(message)
emptyCondition.signal()
}
}
}
ReentrantReadWriteLock
メッセージキューは読み出しによってキューからデータを削除するため副作用(オブジェクトの変化)がありますが、副作用のない読み出し処理も多いです。
副作用のない読み出し処理を行う場合において、高頻度に複数のスレッドからアクセスされるリソースを排他する場合は、読み込みと書き込み両方を同じように排他したのでは無駄が発生します。
書き込み処理は、他の書き込み処理も、読み出し処理も行われていない状態で実行する必要がありますが、読み出し処理は、書き込み処理とは排他する必要がありますが、読み出し処理どうしの排他は不要です。
そのような場合に使える排他の仕組みとして ReentrantReadWriteLock
というものも用意されています。readLock()
/writeLock()
でそれぞれ用のLockを取得し他処理を実装するのですが、Kotlinではこれ用の拡張関数も用意されており、シンプルに記述することができます
val lock = ReentrantReadWriteLock()
lock.read {
// 読み出し処理
}
lock.write {
// 書き込み処理
}
コルーチンの排他処理
前項までで説明した排他の仕組みはスレッドの仕組みです。コルーチンではどうでしょう?
コルーチンを実行しているのはスレッドであるため、スレッドの排他の仕組みも問題無く使えます。
しかし、一部不便な面もあります。
一つは、排他処理はブロック処理であり、リソースの所有権を獲得できるまでの間、スレッドが停止することです。コルーチンの中断ではないため、シングルスレッドのディスパッチャでは全体が停止してしまいます、IOディスパッチャなどを使う必要があります。
もう一つは、クリティカルセクションは中断できない点です。コルーチンの中にクリティカルセクションを作ることはできても、そこからsuspendメソッドを呼び出すことができません。(現在のKotlinコンパイラではエラーとなります)
suspendメソッドを含む処理を排他するには別の仕組みが必要です。それがMutex
です。
Mutex
Mutex
はCoroutineの中で使える排他制御の仕組みで、ReentrantLock
と同様に、lock()
/unlock()
をコールするか
val mutex = Mutex()
mutex.lock()
try {
// do something
} finally {
mutex.unlock()
}
withLock拡張関数を使って実装します。
mutex.withLock {
// do something
}
Mutex
において所有権を獲得するコンテキストはコルーチンです。
lock
やwithLock
はsuspendメソッドであり、suspendブロックの中でコールする必要があります。
所有権が獲得できるまで処理はブロックでは無く中断します。中断なので、シングルスレッドなディスパッチャで使っても全体が止まってしまうことはありません。
ロック中にsuspendメソッドを呼び出すこともできます。中断が発生しても所有権を保持したままなので、中断を含むセクションを排他することができます。
注意すべきなのは、Mutex
は再入不可であるという点です。
ownerを引数に指定することはできますが、これは主にデバッグ用で、すでにそのownerによって獲得済みの場合、IllegalStateException
が発生します。
条件待ちと再開通知
残念ながら、コルーチンには条件待ちと再開通知の仕組みはないそうです。
必要な場合はFlow
やChannel
を使って擬似的な仕組みをつくるか、ブロック動作にしてスレッドの排他の仕組みを利用するしかありません。
Volatile
排他の仕組みとはちょっと違いますが、複数のスレッドからアクセスする仕組みとして重要なのが volatile
です。
Javaは変数宣言の修飾子でKotlinではアノテーションで指定します。
private volatile int count = 0;
@Volatile
private var count = 0
JVMではスレッドから共通のリソースへのアクセスにキャッシュが用いられることがあります。つまり、あるスレッドで値を書き換えても、別のスレッドからはしばらくの間書き換え前の値が読み出されてしまう可能性があります。
Volatileを指定すると、値の変化が他のスレッドにも即座に反映されることが保証されます。
また、一般にプログラムコードはアウトオブオーダー実行、すなわち、そのコンテキスト内の破綻がない範囲でより高速に処理できるよう、命令の順序を入れ替えながら実行されます。この並べ替えはシングルスレッドでは問題ありませんが、複数のスレッドからアクセスされるリソースまでは考慮されていません。
volatileはメモリバリア(メモリフェンス)としても機能し、volatile変数へのアクセスの前後で命令の順序が入れ替えられないことも保証します。
注意が必要なのは、可視性と実行順序を保証するだけであり、操作のアトミック性が保証される訳ではない点です。それらが必要な場合は、排他を利用します。
volatileはある意味、高速実行のための最適化処理を抑止する機能を持っているため、volatileではない変数を扱うよりもコストは高いですが、排他の仕組みほどはコストは高くありません。
排他までは不要であるが、可視性や順序を保証したい場合にvolatileを使います。
排他の仕組みは非常に難しく、基本これらを使わなくて良いように実装する。必要な場合も一から作るのでは無く、あらかじめ用意されたSDKのクラスがあるならそれを利用するのがよいです。
しかし、理解していないと再現性が低い割に重篤な不具合を発生させてしまいかねませんので、しっかりと理解したいところです。
正直自分の理解も合っているか自信がないので、誤りなどあればご指摘いただけると助かります。