0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Kotlin】Kotlin Coroutinesで理解するチャネル(Channel)

Posted at

概要

Kotlin の Channel は、コルーチン間で安全にデータを送受信する仕組みです。
スレッドを共有せずに「値のストリーム」をやり取りでき、Go言語のチャンネルに近い概念です。

簡単に言えば:

  • Channel = コルーチン版の「キュー」+「ストリーム」
  • 複数のコルーチン間で非同期にデータをやり取りできる

1. Channel の基本構造

ChannelSendChannel(送信側)と ReceiveChannel(受信側)の2面を持ちます。

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<Int>()

    launch {
        for (x in 1..5) {
            channel.send(x)
            println("Sent $x")
        }
        channel.close() // 終了を明示
    }

    for (y in channel) {
        println("Received $y")
    }

    println("Done!")
}

ポイント:

  • send() は一時停止可能な関数(suspending)
  • receive() も同様にサスペンドされる
  • channel.close() により送信完了を通知
  • for (x in channel) で受信ループが簡潔に書ける

2. Channel の種類とバッファリング

Channel には4つのバッファタイプがあります。

種類 概要 挙動
Channel.RENDEZVOUS デフォルト(バッファなし) sendとreceiveがペアで同期
Channel.BUFFERED 小容量バッファ バッファが満杯ならサスペンド
Channel.CONFLATED 最新値のみ保持 古い値は破棄される
Channel.UNLIMITED 無制限バッファ メモリに依存(注意)

例:BUFFERED の挙動

fun main() = runBlocking {
    val channel = Channel<Int>(capacity = Channel.BUFFERED)

    launch {
        repeat(5) {
            channel.send(it)
            println("Sent $it")
        }
        channel.close()
    }

    delay(1000)
    for (value in channel) {
        println("Received $value")
    }
}

BUFFERED の場合、送信が少し先行できる(非同期的)。
ただし容量制御しないとメモリを圧迫するので、Backpressure設計が重要です。


3. produce 関数による送信専用チャンネル

produce ビルダーは、送信専用チャネルを生成するための構文です。
これにより、データ生成処理を明確に分離できます。

fun CoroutineScope.produceNumbers() = produce<Int> {
    for (x in 1..5) {
        send(x)
        delay(100L)
    }
}

fun main() = runBlocking {
    val numbers = produceNumbers()
    for (n in numbers) println("Got $n")
    println("Done.")
}

produce の特徴

  • CoroutineScope 拡張関数として定義
  • 戻り値が ReceiveChannel<T>
  • scope.cancel() でチャネルも自動キャンセル

4. actor 関数による受信専用チャネル

逆に actorメッセージを受け取るコルーチン(アクター) を生成します。
メッセージ駆動アーキテクチャ(Actor Model)を構築するのに有用です。

sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0
    for (msg in channel) {
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

fun main() = runBlocking {
    val counter = counterActor()

    repeat(1000) { counter.send(IncCounter) }

    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")

    counter.close()
}

Actor の特徴

  • 状態(counter)を安全に管理できる
  • Mutex 不要(1スレッド内で順序的に処理)
  • メッセージ駆動・非同期タスク管理に適する

5. Channel のクローズと例外処理

明示的に close()

送信完了を知らせるために必ず呼ぶのが推奨です。

channel.close()

受信側は:

for (x in channel) {
    // 送信が完了すると自動でループ終了
}

ClosedSendChannelException / ClosedReceiveChannelException

  • 送信側が既に close() されているのに send()ClosedSendChannelException
  • 受信側が空で close() 済み → ClosedReceiveChannelException

6. 実戦設計:Producer–Consumer パターン

fun CoroutineScope.produceNumbers() = produce<Int> {
    for (x in 1..10) {
        send(x)
        delay(100)
    }
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>) = produce<Int> {
    for (n in numbers) send(n * n)
}

fun main() = runBlocking {
    val numbers = produceNumbers()
    val squares = square(numbers)

    for (s in squares) println("Square: $s")

    println("Done.")
    coroutineContext.cancelChildren()
}

このパターンは以下のように分離:

  • produceNumbers → データ生成者(Producer)
  • square → 処理者(Processor)
  • for ループ → 消費者(Consumer)

7. Channel と Flow の違い

概念 Channel Flow
データ供給 明示的に送受信 (send/receive) 宣言的 (emit/collect)
設計目的 コルーチン間通信 ストリーム処理
終了処理 close() が必要 collect 終了で自動完了
再利用 1:1専用 複数収集者が利用可
適用場面 並行タスク間の通信 データストリームの変換/監視
  • Channel → 「今動いているコルーチン間のパイプ」
  • Flow → 「再利用可能なデータストリームの宣言」

まとめ

要素 説明
Channel<T> コルーチン間で値を送受信するキュー
send() / receive() サスペンド可能な送受信操作
produce 送信専用チャネルを生成
actor メッセージ受信コルーチンを生成
close() 終了通知
SupervisorScope と併用 エラー伝播を制御
Flow との違い Channelは通信、Flowは宣言的ストリーム

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?