概要
Kotlin の Channel は、コルーチン間で安全にデータを送受信する仕組みです。
スレッドを共有せずに「値のストリーム」をやり取りでき、Go言語のチャンネルに近い概念です。
簡単に言えば:
Channel= コルーチン版の「キュー」+「ストリーム」- 複数のコルーチン間で非同期にデータをやり取りできる
1. Channel の基本構造
Channel は SendChannel(送信側)と ReceiveChannel(受信側)の2面を持ちます。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..5) {
channel.send(x)
println("Sent $x")
}
channel.close() // 終了を明示
}
for (y in channel) {
println("Received $y")
}
println("Done!")
}
ポイント:
-
send()は一時停止可能な関数(suspending) -
receive()も同様にサスペンドされる -
channel.close()により送信完了を通知 -
for (x in channel)で受信ループが簡潔に書ける
2. Channel の種類とバッファリング
Channel には4つのバッファタイプがあります。
| 種類 | 概要 | 挙動 |
|---|---|---|
Channel.RENDEZVOUS |
デフォルト(バッファなし) | sendとreceiveがペアで同期 |
Channel.BUFFERED |
小容量バッファ | バッファが満杯ならサスペンド |
Channel.CONFLATED |
最新値のみ保持 | 古い値は破棄される |
Channel.UNLIMITED |
無制限バッファ | メモリに依存(注意) |
例:BUFFERED の挙動
fun main() = runBlocking {
val channel = Channel<Int>(capacity = Channel.BUFFERED)
launch {
repeat(5) {
channel.send(it)
println("Sent $it")
}
channel.close()
}
delay(1000)
for (value in channel) {
println("Received $value")
}
}
BUFFERED の場合、送信が少し先行できる(非同期的)。
ただし容量制御しないとメモリを圧迫するので、Backpressure設計が重要です。
3. produce 関数による送信専用チャンネル
produce ビルダーは、送信専用チャネルを生成するための構文です。
これにより、データ生成処理を明確に分離できます。
fun CoroutineScope.produceNumbers() = produce<Int> {
for (x in 1..5) {
send(x)
delay(100L)
}
}
fun main() = runBlocking {
val numbers = produceNumbers()
for (n in numbers) println("Got $n")
println("Done.")
}
produce の特徴
-
CoroutineScope拡張関数として定義 - 戻り値が
ReceiveChannel<T> -
scope.cancel()でチャネルも自動キャンセル
4. actor 関数による受信専用チャネル
逆に actor はメッセージを受け取るコルーチン(アクター) を生成します。
メッセージ駆動アーキテクチャ(Actor Model)を構築するのに有用です。
sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0
for (msg in channel) {
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
fun main() = runBlocking {
val counter = counterActor()
repeat(1000) { counter.send(IncCounter) }
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close()
}
Actor の特徴
- 状態(
counter)を安全に管理できる -
Mutex不要(1スレッド内で順序的に処理) - メッセージ駆動・非同期タスク管理に適する
5. Channel のクローズと例外処理
明示的に close()
送信完了を知らせるために必ず呼ぶのが推奨です。
channel.close()
受信側は:
for (x in channel) {
// 送信が完了すると自動でループ終了
}
ClosedSendChannelException / ClosedReceiveChannelException
- 送信側が既に
close()されているのにsend()→ClosedSendChannelException - 受信側が空で
close()済み →ClosedReceiveChannelException
6. 実戦設計:Producer–Consumer パターン
fun CoroutineScope.produceNumbers() = produce<Int> {
for (x in 1..10) {
send(x)
delay(100)
}
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>) = produce<Int> {
for (n in numbers) send(n * n)
}
fun main() = runBlocking {
val numbers = produceNumbers()
val squares = square(numbers)
for (s in squares) println("Square: $s")
println("Done.")
coroutineContext.cancelChildren()
}
このパターンは以下のように分離:
-
produceNumbers→ データ生成者(Producer) -
square→ 処理者(Processor) -
forループ → 消費者(Consumer)
7. Channel と Flow の違い
| 概念 | Channel | Flow |
|---|---|---|
| データ供給 | 明示的に送受信 (send/receive) |
宣言的 (emit/collect) |
| 設計目的 | コルーチン間通信 | ストリーム処理 |
| 終了処理 |
close() が必要 |
collect 終了で自動完了 |
| 再利用 | 1:1専用 | 複数収集者が利用可 |
| 適用場面 | 並行タスク間の通信 | データストリームの変換/監視 |
- Channel → 「今動いているコルーチン間のパイプ」
- Flow → 「再利用可能なデータストリームの宣言」
まとめ
| 要素 | 説明 |
|---|---|
Channel<T> |
コルーチン間で値を送受信するキュー |
send() / receive()
|
サスペンド可能な送受信操作 |
produce |
送信専用チャネルを生成 |
actor |
メッセージ受信コルーチンを生成 |
close() |
終了通知 |
SupervisorScope と併用 |
エラー伝播を制御 |
Flow との違い |
Channelは通信、Flowは宣言的ストリーム |