概要
― 同時実行数を制御する構造化並行性の鍵 ―
Kotlin の Semaphore
(セマフォ)は、
同時に実行できるコルーチンの数を制限するための同期プリミティブ です。
たとえば:
- 一度に3件まで API リクエストを実行したい
- 同時ダウンロード数を制限したい
- DB接続数の上限を守りたい
こうした場合に、Semaphore
が活躍します。
1. Semaphore の基本構文
Semaphore
は kotlinx.coroutines.sync.Semaphore
に定義されています。
基本構文
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
fun main() = runBlocking {
val semaphore = Semaphore(2) // 同時に2コルーチンまで実行可能
repeat(5) { i ->
launch {
semaphore.acquire() // ロックを取得
try {
println("Task $i started")
delay(1000L)
println("Task $i finished")
} finally {
semaphore.release() // ロックを解放
}
}
}
}
結果:
Task 0 started
Task 1 started
Task 0 finished
Task 1 finished
Task 2 started
Task 3 started
...
一度に2つのタスクしか実行されない。
Semaphore(2)
→ 「2つの通行証(permits)を持つゲート」。
2. acquire
/ release
と withPermit
明示的に制御する場合
semaphore.acquire()
try {
// 処理
} finally {
semaphore.release()
}
withPermit {}
を使うと安全・簡潔
val semaphore = Semaphore(3)
repeat(10) { i ->
launch {
semaphore.withPermit {
println("Running task $i")
delay(500L)
}
}
}
withPermit {}
の特徴:
- 例外が発生しても自動で
release()
が呼ばれる -
try-finally
の安全な糖衣構文
3. Semaphore と Mutex の違い
概念 | 用途 | 同時実行数 | メソッド |
---|---|---|---|
Mutex | 単一リソースの排他 | 1 |
lock() / unlock()
|
Semaphore | 同時実行数の制限 | N(任意) |
acquire() / release()
|
つまり:
-
Mutex
→ 「1人ずつ入れるトイレ」 -
Semaphore(3)
→ 「3人まで入れる会議室」
4. 実戦例:同時APIアクセス制御
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
suspend fun simulatedRequest(id: Int) {
println("→ Start request $id on ${Thread.currentThread().name}")
delay(1000L)
println("← End request $id")
}
fun main() = runBlocking {
val semaphore = Semaphore(3) // 最大3リクエスト同時
val jobs = (1..10).map { id ->
launch {
semaphore.withPermit {
simulatedRequest(id)
}
}
}
jobs.joinAll()
println("All requests completed")
}
特徴:
- 常に最大3つのリクエストが並列実行される
- 過剰な同時アクセスを防ぎ、サーバ負荷を抑えられる
5. タイムアウト付きセマフォ
一定時間内にパーミットを取得できない場合はスキップしたい、というケース。
import kotlinx.coroutines.withTimeoutOrNull
val semaphore = Semaphore(1)
val result = withTimeoutOrNull(500L) {
semaphore.withPermit {
delay(1000L)
"Done"
}
}
println(result ?: "Timeout!")
結果:
Timeout!
→ 500ms以内に処理が終わらないためキャンセルされます。
このように withTimeoutOrNull
で「待ち時間の上限」を設定可能。
6. Semaphore × Flow の組み合わせ
大量データを並行処理しつつ、同時実行数を制御したいときに便利です。
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.*
suspend fun fetchData(id: Int): String {
delay(300L)
return "Data$id"
}
suspend fun main() = coroutineScope {
val semaphore = Semaphore(3)
(1..10).asFlow()
.map { id ->
async {
semaphore.withPermit {
fetchData(id)
}
}
}
.map { it.await() }
.collect { println(it) }
}
メリット:
- Flow + async + Semaphore で 「バッチ並列処理」 を安全に実装可能
- APIクエリやIOタスクを効率的にさばける
7. Semaphore と構造化並行性
Semaphore は SupervisorScope
と組み合わせることで、
キャンセル伝播を制御しつつ安全な並列タスク管理 が可能です。
supervisorScope {
val semaphore = Semaphore(2)
val jobs = List(5) { id ->
launch {
semaphore.withPermit {
try {
println("Job$id start")
delay(1000L)
if (id == 3) error("Failure in $id")
println("Job$id done")
} catch (e: Exception) {
println("Caught: ${e.message}")
}
}
}
}
jobs.joinAll()
}
効果:
- 一部タスクが失敗しても他のタスクは継続
- 構造化並行性とセマフォ制御の融合で「堅牢な並列実行」
8. Channel・Mutex・Semaphore の使い分け
機能 | Channel | Mutex | Semaphore |
---|---|---|---|
主な用途 | データの送受信 | 共有リソースの排他 | 同時実行数の制限 |
制御単位 | メッセージ | 単一アクセス | N個までのアクセス |
挙動 | キュー的 | 排他的 | 制限付き並行 |
キャンセル安全性 | 高い | 高い | 高い |
実装層 | kotlinx.coroutines.channels | kotlinx.coroutines.sync | kotlinx.coroutines.sync |
覚え方:
- Channel → 値を渡す
- Mutex → ロックをかける
- Semaphore → 並行数を制御する
まとめ
概念 | 内容 |
---|---|
Semaphore(n) |
最大 n 個のパーミットを持つ |
acquire() / release()
|
手動で取得・解放 |
withPermit {} |
自動解放付き安全構文 |
withTimeoutOrNull |
タイムアウト付きロック |
用途 | 同時実行数の制限・リソース保護 |
関連 | Mutex(1つだけ)・Channel(データフロー) |