Posted at

Kotlin coroutinesで同時実行数制限

More than 1 year has passed since last update.


はじめに

Kotlinのcoroutinesはスレッドと比べると非常に軽量であり、Non-blocking操作をさせるだけなら100,000個を同時起動したとしても問題なく処理を行えます。

一方で、サーバに負荷がかかるからAPIをあまり同時に呼びすぎないようにしたい、など並列に実行する数を制限したいシチュエーションというのも勿論存在します。

今回の記事ではcoroutinesの同時実行数をどのように制御すればいいのかについて書きたいと思います。


前提条件

あるAPIを呼び出す関数が以下のように定義されているとします。

suspend fun apiCall(value: Int): Unit

また、HTTPリクエストはNon-blockingで、レスポンスが返るまでこのsuspending functionは中断しているとします。

(Androidのretrofitではenqueueベースに実装するとこうなります)


うまくいかないやりかた

Blockingに慣れた考えですと、「同時実行数のスレッドプールを作って仕事やらせればいい」となって以下のようなコードを書いてしまいがちです。

val context = newFixedThreadPoolContext(nThreads = 3, name = "ForAPI") // 同時に3並列までにしたい

(1..10).forEach { param -> // 10個のリクエストを投げる
launch(context) {
apiCall(param)
}
}

しかしながら、上記のコードでは10個のリクエストがたちまちサーバに投げられることになります。

今回使った3スレッドのスレッドプールはあくまでもcoroutinesの実行に使われるものであり、実行された各coroutineは apiCall() の呼び出しとともに中断してスレッドを他のcoroutinesに明け渡すからです。

この問題を回避するもっともシンプルな方法は apiCall() をBlocking呼び出しすることです。

呼び出し中に呼び出し元スレッドをブロックしていれば、スレッド数以上に同時実行されることはありません。

一方で、Non-blockingで計算資源を有効活用できることがcoroutinesの優位性の1つですので、できるならばとりたくない手法です。


Channel を使おう

coroutinesらしくNon-blockingに解決するには Channel をセマフォとして使うのが簡単です。

Channel は簡単にいえば「ブロックするかわりに中断するBlockingQueue」で、



  • suspend fun send(value: T) => Channel に空きがあれば値を送信、なければ空くまで中断する


  • suspend fun receive(): T => Channel に値があれば受信、なければ値が来るまで中断する

といった関数を提供します。

Channel を使うと以下のようなセマフォが簡単に書けます。

val semaphore = Channel<Unit>(capacity = 3) // 3つまで値を貯め込める

(1..10).forEarch { param -> // 10個のリクエストを投げる
launch(CommonPool) {
semaphore.send(Unit) // チャンネルに空きができるまで中断
apiCall(param)
semaphore.receive() // チャンネルに1つ空きをつくる
}
}

上記のコードで semaphore の空き容量は3なので、起動した10個のcoroutinesのうち、最初の3つは semaphore へ値を送ることができ、後続の apiCall() が実行されます。

他のcoroutinesは semaphore に空きがないため、 send の呼び出しを中断します。

apiCall() の呼び出しが完了したcoroutineは semaphore から値を受信することでチャンネルの空きをつくります。

すると中断していたcoroutineが send から再開し、後続の apiCall() を呼び出し・・・と続いていきます。

このようにすれば、最大3並列に同時実行数を制限しながらNon-blockingに処理を行うことができます。


まとめ


  • coroutinesはNon-blockingがいいところだから、なるべくブロックしないように書こう


  • Channel はブロックの代わりに中断してくれる特性が便利

  • 同時実行数制限は Channel セマフォで