※ソース記事はこちら
Deferred値により、コルーチンの間で、単一の値を転送する便利な方法が提供されている。チャネルによって、値のストリームを転送する方法が提供されている。
チャネルの基本
Channelは概念的には、BlockingQueue
に非常に良く似ている。一つの重要な違いは、blockingput
操作の代わりに、suspendするsendを持っており、blockingtake
の代わりにsuspendするreceiveがある。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
// これは重いCPUを消費する計算や非同期ロジックかもしれないが、単に5つの二乗値を送信する
for (x in 1..5) channel.send(x * x)
}
// ここで5つの受けとった整数を出力する
repeat(5) { println(channel.receive()) }
println("Done!")
}
このコードの出力は以下の通り。
1
4
9
16
25
Done!
チャネル上のクローズと繰り返し
キューと異なり、チャネルはこれ以上要素が来ないことを示すためにクローズすることができる。レシーバー側では、チャネルから要素を受け取るため、通常のfor
ループを使うことは便利である。
概念的に、closeは、チャネルに対して、特別なcloseトークンを送るようなものである。繰り返しはクローズトークンを受け取るとすぐに停止する。そのため、クローズを受け取る前に送られたすべての要素は保証されている。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // 送信完了
}
// ここで`for`ループを使って受けっとた値を出力する(チャネルがクローズするまで)
for (y in channel) println(y)
println("Done!")
}
チャネルのProducerを組み立てる
コルーチンが一連の要素を生成するパターンは非常にありふれている。これは並列コードでよくみられるproducer-consumerパターンの一部である。そのようなproducerからパラメータとしてチャネルを取る、関数を抽出することが可能であるが、結果は関数から返却されるべきという常識に逆行する。
produceという名を持つ便利なコルーチンビルダーがあり、producer側で簡単にすぐそれをすることができ、consumeEach拡張関数で、for
ループを置き換えることができる。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}
fun main() = runBlocking {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
パイプライン
パイプラインは、あるコルーチンがおそらく無限に値のストリームを生成するパターンである。
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 1から始まる整数の無限のストリーム
}
そして、別のコルーチンあるいはコルーチン群が、そのストリームを消費し、いくつか処理をして、いくつかの他の結果を生成する。以下の例では、数値が単に二乗されている。
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
メインのコードは、開始して全体のパイプラインをつなげる。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val numbers = produceNumbers() // 1から以降の整数を生成する
val squares = square(numbers) // 整数を二乗する
repeat(5) {
println(squares.receive()) // 最初の5つを出力する
}
println("Done!") // 完了
coroutineContext.cancelChildren() // このコルーチン群をキャンセルする
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 1から始まる整数の無限のストリーム
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
コルーチンを作るすべての関数は、CoroutineScopeの拡張として定義されている。それにより、アプリケーションに長引くグローバルなコルーチンを確実に持たないことで、構造化された並列性に頼ることができる。
パイプラインを使った素数
コルーチンのパイプラインを使って、素数を生成する例を使って、パイプラインを極端に使ってみよう。数字の無限のシーケンスから始める。
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // startから始まる整数の無限のストリーム
}
次のパイプラインステージは、やってくる数字のストリームをフィルターする。与えられた素数で割り切れる数値をすべて削除する。
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
そうして、2から始まる数字のストリームを開始し、現在のチャネルから素数を取り出し、見つかったそれぞれの素数のために新しいパイプラインステージを起動することで、パイプラインを構築する。
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
次の例では、メインスレッドのコンテキストで全体のパイプラインを動して、最初の10個の素数を出力する。すべてのコルーチンはメインのrunBlockingコルーチンのスコープで起動するため、開始したすべてのコルーチンの明示的な一覧を維持する必要はない。cancelChildren拡張関数を使って、最初の10個の素数を出力した後で、すべての子のコルーチンをキャンセルする。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // startから始まる整数の無限のストリーム
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
このコードの出力は以下となる。
2
3
5
7
11
13
17
19
23
29
留意すべきは、標準ライブラリからiteratorコルーチンビルダーを使って同じパイプラインを構築することができる。produce
をiterator
に、send
をyield
に、receive
をnext
に、ReceiveChannel
をIterator
に置き換え、コルーチンスコープを除く。runBlocking
も必要ないだろう。しかし、上で見たチャネルを使うパイプラインのメリットは、Dispatchers.Default
コンテキストで動作させる場合、複数のCPUコアを使うことができるということである。
とにかくこれは素数を調べるための極端に非実用的な方法である。実際はパイプラインは、いくつかの他のsuspend呼び出し(外部サービスの非同期呼び出しのようなもの)を含み、これらのパイプラインはsequence
やiterator
を使って構築することはできない。なぜなら、それらは完全に非同期であるproduce
と違い、任意のsuspend処理を許可しないからである。
散開
複数のコルーチンは、同じチャネルから受け取り、それら自身の間で処理を分散するかもしれない。定期的に整数(1秒につき10個の数)を生成するproducerコルーチンから始めよう。
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // 1から開始
while (true) {
send(x++) // 次を生成
delay(100) // 0.1s待機
}
}
その後、いくつかのprocessorコルーチンがある。この例では、そのIDと受け取った数を出力するだけである。
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
それでは、5つのprocessorを起動し、ほぼ1秒処理をさせよう。何が起こるかご覧いただきたい。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // producerコルーチンをキャンセルし、すべてをkillする
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // 1から開始
while (true) {
send(x++) // 次を生成
delay(100) // 0.1s待機
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
留意すべきは、producerコルーチンをキャンセルすることは、そのチャネルをクローズし、したがって結果的にprocessorコルーチンが行っているチャネル上の繰り返しを終了させるということである。
さらにlaunchProcessor
コードで散開を実行するためにfor
ループを使ってチャネル上の繰り返しをする方法に注意を払うこと。consumeEach
と違い、for
ループのパターンは、複数のコルーチンから使うのに完全に安全である。processorコルーチンの一つが失敗した場合、ほかのものは引き続きチャネルを処理するだろう。一方、consumeEach
経由で書かれたprocessorは、正常あるいは異常終了において、配下のチャネルを常にconsume(あるいはキャンセル)する。
集結
複数のコルーチンは、同じチャネルに送るかもしれない。例えば、文字列のチャネルがあり、suspend関数が特定のディレイで、このチャネルに特定の文字列を繰り返し送るとする。
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
それでは、文字列を送る数個のコルーチンを起動した場合(この例では、メインコルーチンの子としてメインスレッドのコンテキストでそれらを起動する)、何が起こるか見てみよう。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // 最初の6つを受け取る
println(channel.receive())
}
coroutineContext.cancelChildren() // メインを完了させるため、すべての子をキャンセルする
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
出力は以下の通り。
foo
foo
BAR!
foo
foo
BAR!
bufferedチャネル
今まで見てきたチャネルはバッファを持たない。バッファのないチャネルは、送信者と受信者がお互いに合うとき(別名ランデブー)、要素を送る。sendが最初に呼び出される場合、receiveが呼び出されるまでsuspendされ、receiveが最初に呼び出される場合、sendが呼び出されるまでsuspendされる。
Channel()ファクトリ関数とproduceビルダーは両方とも、バッファサイズを指定するため、オプションのcapacity
パラメータを持つ。バッファにより、送信者はsuspendする前に複数の要素をsendすることができ、指定した世量を持つBlockingQueue
に似て、バッファがフルになるとブロックする。
次のコードのふるまいを見てみる。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking<Unit> {
val channel = Channel<Int>(4) // bufferedチャネルを作る
val sender = launch { // 送信コルーチンを起動する
repeat(10) {
println("Sending $it") // それぞれの要素を送る前に出力する
channel.send(it) // バッファがフルになるとsuspendする
}
}
// 何も受け取らない..ただ待機する
delay(1000)
sender.cancel() // 送信コルーチンをキャンセルする
}
4の容量を持つbufferedチャネルを使って、5回"sending"を出力する。
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
最初の4つの要素はバッファに追加され、送信者は5つめをsendしようとするとき、suspendする。
チャネルは公平
チャネルに対するsendとreceiveの操作は、複数のコルーチンからの呼び出し順序という点で、公平である。それらはファースト・イン・ファーストアウトの順序で提供され、例えば、receive
を呼び出した最初のコルーチンが要素を受け取る。次の例で、"ping"と"pong"の二つのコルーチンが、共有の"table"チャネルから"ball"オブジェクトを受信している。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // 共有のテーブル
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // ボールをサーブする
delay(1000) // 1秒ディレイ
coroutineContext.cancelChildren() // ゲームオーバー、キャンセルする
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // ループの中でボールを受け取る
ball.hits++
println("$name $ball")
delay(300) // 少し待機する
table.send(ball) // ボールを送り返す
}
}
"ping"コルーチンが最初に開始し、そのためボールを受け取る最初のものとなる。たとえ"ping"コルーチンは、tableに対して送り返した後に、すぐballをreceiveし始めるとしても、ballは"pong"コルーチンによってreceiveされることになる。なぜなら、それがすでに待っているからである。
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
留意すべきは、ときどきコルーチンは、使われているexecutorの性質のために、不公平に見える実行を生成するかもしれない。詳細はこの問題を参照のこと。
tickerチャネル
tickerチャネルは、そのチャネルからの最後の消費から、与えられたディレイの経過ごと、Unit
を生成する特別なランデブーチャネルである。それは使い道の無い単体に思えるかもしれないが、ウィンドウ機能や、時間依存の処理をする、複雑な時間ベースのproduceパイプラインと演算子を作るために使いやすい基礎的な要素である。tickerチャネルは、"on tick"アクションを実行するために、select内で使われる。
そのようなチャネルを作るには、tickerファクトリーメソッドを使う。これ以上要素がないことを示すには、そこでReceiveChannel.cancelメソッドを使う必要がある。
実際に動作させる方法を見てみよう。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // tickerチャネルを作る
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // 初期ディレイはなし※ticker(0ms)を受け取る
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // すべての後続の要素は100msディレイを持つ ※受け取れない
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } // ※ticker(100ms)を受け取る
println("Next element is ready in 100 ms: $nextElement")
// 大きな消費ディレイをエミュレーションする
println("Consumer pauses for 150ms")
delay(150)
// 次の要素はすぐに有効である。
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } // ※ticker(200ms)を受け取る
println("Next element is available immediately after large consumer delay: $nextElement")
// 留意すべきは、`receive`呼び出しの間での停止が考慮され、次の要素がより早く到着する。
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } // ※ticker(300ms)を受け取る
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // もう要素が必要ないことを示す
}
次の行を出力する。
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
留意すべきは、tickerはconsumerのありうる停止に気づいており、もし停止が発生する場合、デフォルトで次の生成のディレイを調整し、生成された要素の固定割合を維持しようとする。
オプションとして、TickerMoe.FIXED_DELAYに等しいmode
パラメータを指定して、要素間の固定ディレイを維持することができる。