About
KotlinでFIFOな直列処理キューを実装したくなり、せっかくなのでコルーチン(kotlinx-coroutines-core)を使って実装してみました。
イメージとしては、main以外のシングルスレッドでEventLoopが回っていて、Queueに溜まった処理を直列に処理していくというものです。
Channelによる実装
最初、EventLoopという、それらしいCoroutineDispatcher
を発見したのですが、いまいち使い方が分からず、ゴニョゴニョしているうちにChannel
を使った実装に落ち着きました。
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel
class SerializedOperationQueue(name: String = "EventLoop", capacity: Int = 0) {
private val singleThreadContext = newSingleThreadContext(name)
private val queue = Channel<suspend () -> Unit>(capacity).also {
launch(singleThreadContext) {
for (operation in it) {
operation.invoke()
}
}
}
fun push(operation: suspend () -> Unit) = launch(Unconfined) {
queue.send(operation)
}
}
Actorで書き直し
もう少し調べてみると、Channel
を内包したActorなるものを発見したので、書き直してみました。
もはやラップする必要も無い程のシンプルさですね。。
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.actor
class SerializedOperationQueue(name: String = "EventLoop", capacity: Int = 0) {
private val singleThreadContext = newSingleThreadContext(name)
private val actor = actor<suspend () -> Unit>(singleThreadContext, capacity) {
for (operation in channel) {
operation.invoke()
}
}
fun push(operation: suspend () -> Unit) = launch(Unconfined) {
actor.send(operation)
}
}
実行
いずれの実装でも、次のような使い方になります。
fun main(args : Array<String>) = runBlocking {
val operationQueue = SerializedOperationQueue()
listOf(0, 1, 2, 3, 4).forEach {
operationQueue.push {
delay((5 - it) * 1000L)
println("Job[$it]@${Thread.currentThread().name}")
}
println("Push[$it]@${Thread.currentThread().name}")
}
delay(20_000L)
}
実行してみると、Pushされた順に処理が実行されていることが分かります。
Push[0]@main
Push[1]@main
Push[2]@main
Push[3]@main
Push[4]@main
Job[0]@EventLoop
Job[1]@EventLoop
Job[2]@EventLoop
Job[3]@EventLoop
Job[4]@EventLoop
知れば知るほど楽しいコルーチンの世界...