1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

コルーチンとチャネル入門#8 チャネル

Last updated at Posted at 2022-06-20

ソース記事はこちら
共有の可変状態を含むコードを書くのは、とても難しくエラーを起こしやすいことが知られている。(このチュートリアルでさえ、コールバックを使用するソリューションを実装するかたわら、遭遇した) 共有の可変状態を使う情報の共有の代わりに、メッセージによる情報の共有により、これを簡単にしようとしている。コルーチンはチャネル経由でお互いにメッセージをやりとりすることができる。
チャネルは、異なるコルーチンの間でデータを渡すことを可能にする通信基本命令である。あるコルーチンはチャネルに対していくつかの情報を送ることができ、一方で他のコルーチンはチャネルからその情報を受け取ることができる。
img1
情報を送る(生成する)コルーチンは度々、producerと呼ばれ、情報を受け取る(消費する)コルーチンはconsumerと呼ばれる。必要なときは、多くのコルーチンが同じチャネルに情報を送ることができ、多くのコルーチンがそれから情報を受け取ることができる。
img2
留意すべきは、多くのコルーチンが同じチャネルから情報を受け取るときは、それぞれの要素はcosumerの一つによってのみ扱われる、つまり自動的に扱われるということは、チャネルからその要素を削除するという意味である。
チャネルは、要素の集合に似たものとして考えることができる。(直接的な類似物はキューであり、要素は片側から追加され、もう片方から受け取る)しかし、重要な相違点がある。集合と異なり、同期化バージョンでさえも、チャネルはsendreceive操作を一時停止することがある。これは、チャネルが空、あるはフル(チャネルのサイズは制約がついているかもしれず、その場合フルがありうる)のときに発生する。
Channelは3つの異なるインターフェイスで表される。つまりSendChannelReceiveChannelと二つを拡張するChannelである。大抵はチャネルを作り、SendChannelインスタンスとしてproducerに割り当て、そうするとproducerはそれに送信だけができる。またconsumerにReceiveChannelインスタンスとして割り当て、そうするとconsumerはそれから受信だけができる。留意すべきは、sendreceiveメソッドは両方ともsuspendとして宣言されているということである。

interface SendChannel<in E> {
    suspend fun send(element: E)
    fun close(): Boolean
}

interface ReceiveChannel<out E> {
    suspend fun receive(): E
}    

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

producerはもはや要素が来ないことを示すためにチャネルを閉じることができる。
チャネルのいくつかのタイプはライブラリで定義されている。それらは、いくつの要素を内部的に保存できるか、そしてsend呼び出しが一時停止できるかどうか、によって異なる。すべてのチャネルのタイプにとって、receive呼び出しは同じようにふるまう。つまりチャネルが空でない場合は、要素を受け取り、それ以外の場合は一時停止する。

  • Unlimited channel
    c1
    Unlimited channelは、キューに最も近い類似物である。つまりproducerはこのチャネルへ要素を送信することができ、それは無限に増加する。send呼び出しは決して一時停止しない。メモリが不足した場合、OutOfMemoryExceptionを受け取るだろう。キューとの相違点は、consumerが空のチャネルから受信を試み、いくつかの新しいデータがチャネルに送信されるまで一時停止するときに現れる。
  • Buffered channel
    c2
    Buffered channelのサイズは、指定した数値の制約がある。producerはサイズ制限に達するまで、このチャネルに要素を送信することができる。すべての要素は内部的に保存される。チャネルがフルの場合、send呼び出しは、空きスペースが現れるまで、一時停止する。
  • “Rendezvous” channel
    c3
    “Rendezvous” channelは、バッファを持たないチャネルである。つまり、Buffered channelをゼロサイズで作るのと同じである。sendまたはreceiveの関数のどちらかは、他方が呼び出されるまで常に一時停止状態になる。もしsend関数が呼び出され、チャネルが空の場合、あるいは別の言葉でいうと、要素を送信する準備ができている一時停止状態のsend呼び出しが無い場合、receive呼び出しが一時停止する。「Rendezvous」の名前(同意した時間と場所で会うこと)は、sendreceiveが「定刻に会う」べきという事実に言及している。
  • Conflated channel
    c4
    Conflated channelに送信された新しい要素は、以前に送信された要素を上書き、receiverは常に最新の要素のみを取得する。send呼び出しは決して一時停止しない。

チャネルを作成するとき、その型とバッファサイズ(バッファリングが必要な場合)を指定する。

val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

デフォルトでは、“Rendezvous” channelが作られる。
次の例では、2つのproducerコルーチンと1つのconsumerコルーチンの“Rendezvous” channelを作る。

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        channel.send("A1") // ※1(suspend)
        channel.send("A2") // ※8
        log("A done") // ※9
    }
    launch {
        channel.send("B1") // ※2(suspend)
        log("B done") // ※10
    }
    launch {
        repeat(3) {
            val x = channel.receive() // ※3,5,7(suspend),11
            log(x) // ※4,6,12
        }
    }
    // ※以下のように出力される
    // [main] A1
    // [main] B1
    // [main] A done
    // [main] B done
    // [main] A2
}

fun log(message: Any?) {
    println("[${Thread.currentThread().name}] $message")
}

この例で何が行われているのか、より理解したい場合は、次のビデオを参照のこと。

課題

loadContributorsChannels関数を実装し、すべてのGitHubコントリビューターを並列に要求するが、同時に中間の進捗を表示する。これらの二つの前回の関数、Request5Concurrent.ktからloadContributorsConcurrentRequest6Progress.ktからloadContributorsProgressを使用すること。

ヒント

異なるリポジトリのためのコントリビューターを並列に受信する異なるコルーチンは、すべての受信した結果を同じチャネルに送ることができる。

val channel = Channel<List<User>>()
for (repo in repos) {
    launch {
        val users = ...
        // ...
        channel.send(users)
    }
}

その後、このチャネルからの要素は、ひとつづつ受け取り、処理することができる。

repeat(repos.size) {
    val users = channel.receive()
    ...
}

シーケンシャルにreceive呼び出しを呼ぶため、追加の同期処理は不要である。

解法

loadContributorsProgress関数のまま、「全コントリビューター」の一覧の中間状態を格納するallUsers変数をつくることができる。チャネルからそれぞれの新しい一覧を受信するとき、全ユーザーの一覧へ追加し、結果を集約し、updateResultsコールバックを使って状態を更新する。

suspend fun loadContributorsChannels(
    service: GitHubService,
    req: RequestData,
    updateResults: suspend (List<User>, completed: Boolean) -> Unit
) = coroutineScope {

    val repos = service
        .getOrgRepos(req.org)
        .also { logRepos(req, it) }
        .bodyList()

    val channel = Channel<List<User>>()
    for (repo in repos) {
        launch {
            val users = service.getRepoContributors(req.org, repo.name)
                .also { logUsers(repo, it) }
                .bodyList()
            channel.send(users)
        }
    }
    var allUsers = emptyList<User>()
    repeat(repos.size) {
        val users = channel.receive()
        allUsers = (allUsers + users).aggregate()
        updateResults(allUsers, it == repos.lastIndex)
    }
}

留意すべきは、異なるリポジトリの結果は、準備ができるとすぐにチャネルに追加されるということである。最初に、すべての要求が送られ、データを受信していないとき、receive呼び出しが一時停止する。この場合、"コントリビュータのロード"コルーチン全体が一時停止する。その後、ユーザーの一覧がチャネルに送られると、"コントリビュータのロード"コルーチンが復帰し、receive呼び出しがその一覧を返却し、結果がすぐに更新される。
suspend関数を使い、コルーチンを並列に実行し、チャネルを使ってコルーチン間で情報を共有する方法について学んだ。
留意すべきは、コルーチンもチャネルも、並列性からくる複雑さを完全に根絶はしないが、複雑さを論証し、何が起きているか理解する必要があるときに、それらは明らかに人生を楽にしてくれるだろう。
次は、コルーチンを使うコードをテストする方法について議論しよう。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?