ソース記事はこちら
共有の可変状態を含むコードを書くのは、とても難しくエラーを起こしやすいことが知られている。(このチュートリアルでさえ、コールバックを使用するソリューションを実装するかたわら、遭遇した) 共有の可変状態を使う情報の共有の代わりに、メッセージによる情報の共有により、これを簡単にしようとしている。コルーチンはチャネル経由でお互いにメッセージをやりとりすることができる。
チャネルは、異なるコルーチンの間でデータを渡すことを可能にする通信基本命令である。あるコルーチンはチャネルに対していくつかの情報を送ることができ、一方で他のコルーチンはチャネルからその情報を受け取ることができる。
情報を送る(生成する)コルーチンは度々、producerと呼ばれ、情報を受け取る(消費する)コルーチンはconsumerと呼ばれる。必要なときは、多くのコルーチンが同じチャネルに情報を送ることができ、多くのコルーチンがそれから情報を受け取ることができる。
留意すべきは、多くのコルーチンが同じチャネルから情報を受け取るときは、それぞれの要素はcosumerの一つによってのみ扱われる、つまり自動的に扱われるということは、チャネルからその要素を削除するという意味である。
チャネルは、要素の集合に似たものとして考えることができる。(直接的な類似物はキューであり、要素は片側から追加され、もう片方から受け取る)しかし、重要な相違点がある。集合と異なり、同期化バージョンでさえも、チャネルはsend
とreceive
操作を一時停止することがある。これは、チャネルが空、あるはフル(チャネルのサイズは制約がついているかもしれず、その場合フルがありうる)のときに発生する。
Channel
は3つの異なるインターフェイスで表される。つまりSendChannel
とReceiveChannel
と二つを拡張するChannel
である。大抵はチャネルを作り、SendChannel
インスタンスとしてproducerに割り当て、そうするとproducerはそれに送信だけができる。またconsumerにReceiveChannel
インスタンスとして割り当て、そうするとconsumerはそれから受信だけができる。留意すべきは、send
、receive
メソッドは両方ともsuspend
として宣言されているということである。
interface SendChannel<in E> {
suspend fun send(element: E)
fun close(): Boolean
}
interface ReceiveChannel<out E> {
suspend fun receive(): E
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
producerはもはや要素が来ないことを示すためにチャネルを閉じることができる。
チャネルのいくつかのタイプはライブラリで定義されている。それらは、いくつの要素を内部的に保存できるか、そしてsend
呼び出しが一時停止できるかどうか、によって異なる。すべてのチャネルのタイプにとって、receive
呼び出しは同じようにふるまう。つまりチャネルが空でない場合は、要素を受け取り、それ以外の場合は一時停止する。
- Unlimited channel
Unlimited channelは、キューに最も近い類似物である。つまりproducerはこのチャネルへ要素を送信することができ、それは無限に増加する。send
呼び出しは決して一時停止しない。メモリが不足した場合、OutOfMemoryException
を受け取るだろう。キューとの相違点は、consumerが空のチャネルから受信を試み、いくつかの新しいデータがチャネルに送信されるまで一時停止するときに現れる。 - Buffered channel
Buffered channelのサイズは、指定した数値の制約がある。producerはサイズ制限に達するまで、このチャネルに要素を送信することができる。すべての要素は内部的に保存される。チャネルがフルの場合、send
呼び出しは、空きスペースが現れるまで、一時停止する。 - “Rendezvous” channel
“Rendezvous” channelは、バッファを持たないチャネルである。つまり、Buffered channelをゼロサイズで作るのと同じである。send
またはreceive
の関数のどちらかは、他方が呼び出されるまで常に一時停止状態になる。もしsend
関数が呼び出され、チャネルが空の場合、あるいは別の言葉でいうと、要素を送信する準備ができている一時停止状態のsend呼び出しが無い場合、receive
呼び出しが一時停止する。「Rendezvous」の名前(同意した時間と場所で会うこと)は、send
とreceive
が「定刻に会う」べきという事実に言及している。 - Conflated channel
Conflated channelに送信された新しい要素は、以前に送信された要素を上書き、receiverは常に最新の要素のみを取得する。send
呼び出しは決して一時停止しない。
チャネルを作成するとき、その型とバッファサイズ(バッファリングが必要な場合)を指定する。
val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)
デフォルトでは、“Rendezvous” channelが作られる。
次の例では、2つのproducerコルーチンと1つのconsumerコルーチンの“Rendezvous” channelを作る。
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
channel.send("A1") // ※1(suspend)
channel.send("A2") // ※8
log("A done") // ※9
}
launch {
channel.send("B1") // ※2(suspend)
log("B done") // ※10
}
launch {
repeat(3) {
val x = channel.receive() // ※3,5,7(suspend),11
log(x) // ※4,6,12
}
}
// ※以下のように出力される
// [main] A1
// [main] B1
// [main] A done
// [main] B done
// [main] A2
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
この例で何が行われているのか、より理解したい場合は、次のビデオを参照のこと。
課題
loadContributorsChannels
関数を実装し、すべてのGitHubコントリビューターを並列に要求するが、同時に中間の進捗を表示する。これらの二つの前回の関数、Request5Concurrent.kt
からloadContributorsConcurrent
とRequest6Progress.kt
からloadContributorsProgress
を使用すること。
ヒント
異なるリポジトリのためのコントリビューターを並列に受信する異なるコルーチンは、すべての受信した結果を同じチャネルに送ることができる。
val channel = Channel<List<User>>()
for (repo in repos) {
launch {
val users = ...
// ...
channel.send(users)
}
}
その後、このチャネルからの要素は、ひとつづつ受け取り、処理することができる。
repeat(repos.size) {
val users = channel.receive()
...
}
シーケンシャルにreceive
呼び出しを呼ぶため、追加の同期処理は不要である。
解法
loadContributorsProgress
関数のまま、「全コントリビューター」の一覧の中間状態を格納するallUsers
変数をつくることができる。チャネルからそれぞれの新しい一覧を受信するとき、全ユーザーの一覧へ追加し、結果を集約し、updateResults
コールバックを使って状態を更新する。
suspend fun loadContributorsChannels(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit
) = coroutineScope {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
val channel = Channel<List<User>>()
for (repo in repos) {
launch {
val users = service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
channel.send(users)
}
}
var allUsers = emptyList<User>()
repeat(repos.size) {
val users = channel.receive()
allUsers = (allUsers + users).aggregate()
updateResults(allUsers, it == repos.lastIndex)
}
}
留意すべきは、異なるリポジトリの結果は、準備ができるとすぐにチャネルに追加されるということである。最初に、すべての要求が送られ、データを受信していないとき、receive
呼び出しが一時停止する。この場合、"コントリビュータのロード"コルーチン全体が一時停止する。その後、ユーザーの一覧がチャネルに送られると、"コントリビュータのロード"コルーチンが復帰し、receive
呼び出しがその一覧を返却し、結果がすぐに更新される。
suspend関数を使い、コルーチンを並列に実行し、チャネルを使ってコルーチン間で情報を共有する方法について学んだ。
留意すべきは、コルーチンもチャネルも、並列性からくる複雑さを完全に根絶はしないが、複雑さを論証し、何が起きているか理解する必要があるときに、それらは明らかに人生を楽にしてくれるだろう。
次は、コルーチンを使うコードをテストする方法について議論しよう。