17
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

KotlinのコルーチンでFIFOな直列処理キューを実装

Posted at

About

KotlinでFIFOな直列処理キューを実装したくなり、せっかくなのでコルーチン(kotlinx-coroutines-core)を使って実装してみました。

イメージとしては、main以外のシングルスレッドでEventLoopが回っていて、Queueに溜まった処理を直列に処理していくというものです。

Channelによる実装

最初、EventLoopという、それらしいCoroutineDispatcherを発見したのですが、いまいち使い方が分からず、ゴニョゴニョしているうちにChannelを使った実装に落ち着きました。

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel

class SerializedOperationQueue(name: String = "EventLoop", capacity: Int = 0) {
    private val singleThreadContext = newSingleThreadContext(name)
    private val queue = Channel<suspend () -> Unit>(capacity).also {
        launch(singleThreadContext) {
            for (operation in it) {
                operation.invoke()
            }
        }
    }

    fun push(operation: suspend () -> Unit) = launch(Unconfined) {
        queue.send(operation)
    }
}

Actorで書き直し

もう少し調べてみると、Channelを内包したActorなるものを発見したので、書き直してみました。

もはやラップする必要も無い程のシンプルさですね。。

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.actor

class SerializedOperationQueue(name: String = "EventLoop", capacity: Int = 0) {
    private val singleThreadContext = newSingleThreadContext(name)
    private val actor = actor<suspend () -> Unit>(singleThreadContext, capacity) {
        for (operation in channel) {
            operation.invoke()
        }
    }

    fun push(operation: suspend () -> Unit) = launch(Unconfined) {
        actor.send(operation)
    }
}

実行

いずれの実装でも、次のような使い方になります。

fun main(args : Array<String>) = runBlocking {
    val operationQueue = SerializedOperationQueue()

    listOf(0, 1, 2, 3, 4).forEach {
        operationQueue.push {
            delay((5 - it) * 1000L)
            println("Job[$it]@${Thread.currentThread().name}")
        }
        println("Push[$it]@${Thread.currentThread().name}")
    }

    delay(20_000L)
}

実行してみると、Pushされた順に処理が実行されていることが分かります。

Push[0]@main
Push[1]@main
Push[2]@main
Push[3]@main
Push[4]@main
Job[0]@EventLoop
Job[1]@EventLoop
Job[2]@EventLoop
Job[3]@EventLoop
Job[4]@EventLoop

知れば知るほど楽しいコルーチンの世界...

17
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?