はじめに
とある非同期処理を1件ずつ順に処理したかったので、Kotlin CoroutinesのChannelを使用して実装しました。
簡略化して残しておきます。
JobQueueクラスを作成
実際に処理を積んで実行してくれるクラスを作成します。
class JobQueue {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
// Channel作成(ジェネリクスにsuspendの関数型を指定しているのでラムダ式のみ送受信します)
// Channel.BUFFEREDを指定して、積まれるJobの数に制限をかけています。
private val jobChannel = Channel<suspend () -> Unit>(Channel.BUFFERED)
init {
scope.launch {
for (job in jobChannel) {
job()
delay(1000L) // 1秒に1回Job実行させたかったのでdelay
}
}
}
fun enqueue(job: suspend () -> Unit) {
val result = jobChannel.trySend(job)
if (!result.isSuccess) {
println("キュー満杯")
}
}
追加するJobが非同期処理で、かつ同時にStartすることに不都合があったため、 delay(1000L)で1秒に1回ずつJobを実行する仕組みになっています。
enqueue(job:)
内のjobChannel.trySend(job)
でChannelに処理をつんでいます。
私の場合、Channelが万が一満杯になった場合は、処理をdropしても問題ないケースだったのでtrySendを使用して、満杯であれば失敗を返すようにしています。
代わりにsend()
を使用すると、Channelに空きが出るまで中断して、空き次第sendするようなので、状況によって使いわけが必要です。
使う側
使う側ではenqueueの引数に実行したい処理を入れるだけです。
private val jobQueue = JobQueue()
jobQueue.enqueue {
println("job1")
}
jobQueue.enqueue {
println("job2")
}
補足
ほぼほぼ自分の使用例のみの記載なので、全てのJobが実行されることを望むのか否かでjobの送り方等変わってくるかと思います。
今回はChannel.BUFFERED
にしていますが、メモリが許す限り無限にJobを送信できるChannel.UNLIMITED
もあります。
また、ログアウト時に実行中のJobのキャンセルやChannelのリセットなどしたい場合は別途ハンドリングがひつようになってきますのでご注意ください。