Kotlin
spring-boot
PubSub
KotlinDay 15

kotlin + SpringBootでPub/Sub + Server-Sent Event

この記事は Kotlin Advent Calendar 15日目の記事です。

最近業務でgoでredisを使ってpub/subを実装していたのですが、kotlinだとどう書くんだろうと気になったので試してみました。

このあたりの記事を参考にしています。

redisのライブラリはこれを使いました。
GitHub - xetorthio/jedis: A blazingly small and sane redis java client
(pure kotlinなライブラリの決定版はまだなさそうでした)

まずはcontroller

    @GetMapping(value = "/sse", produces = arrayOf("text/event-stream"))
    @ResponseBody
    fun sse() : SseEmitter {
        return redisService.messageEmitter()
    }

SprintBoot4にはSseEmitterというものがあるのでそれを返すだけです。content-typeは text/event-streamにしてあげます。
なお、SpringBoot5ではFluxが使えるようです。
demo-spring-sse/SseController.java at master · aliakh/demo-spring-sse · GitHub

そしてRedisと接続するところ。

@Service
open class RedisService() {
    private lateinit var redisClient: Jedis
    private val chan = Channel<String>(2)
    init {
        redisClient = Jedis("localhost")

        async(CommonPool) {
            redisClient!!.subscribe(MyPubSub(chan), "hogeChannel")
        }

    }

    fun messageEmitter() : SseEmitter {
        val sse = SseEmitter()
        ko{
            for (m in chan) {
                sse.send(m)
            }
        }
        return sse
    }

    class MyPubSub : JedisPubSub {
        private lateinit var chan : Channel<String>
        constructor(ch: Channel<String>){
            chan = ch
        }
        override fun onMessage(channel: String?, message: String?) {
            println("channel : $channel, message : $message")
            ko {
                send(message!!)
            }
        }

        suspend fun send(message: String){
            println("send message")
            chan.send(message)
        }
    }
}

サーバー起動時にredisでsubscribeしておきます。
subscribeはブロックするのでそのままだとサーバー起動に失敗するのでkotlinのコルーチンを使っています。

また、Jedisはsubscribe時にJedisPubSubというclassを継承したものを渡してあげる必要があるのでMyPubSubというクラスを作っています。

redisからメッセージを受け取ったときにemitterに渡すためにkotlin coroutine exampleにあるchannelを使っています。
kotlin-coroutines/examples/channel at master · Kotlin/kotlin-coroutines · GitHub

koという関数もexampleにあるgo.ktから持ってきてます。

// https://github.com/Kotlin/kotlin-coroutines/blob/master/examples/channel/go.kt
fun go(block: suspend () -> Unit) = CommonPool.runParallel(block)

これでサーバーを起動してcurlでアクセスしてからredisでpublishするとメッセージを受け取ることができます。r

curl -XGET http://localhost:8080/sse
data:hello

data:hello

data:hello

が、このままだと上手くいくのはコネクションが1つしかないときのみです。

for (m in chan) {
    sse.send(m)
}

でchannelからメッセージを取り出してクライアントに返しているのですが、複数クライアントに同じメッセージを返したい場合はクライアントのコネクションを管理して全てのemitterにメッセージを送信する必要がありそうです。

まとめ

kotlin + SpringBootで割りと簡単にpub/subできました。
非同期処理もコルーチンで簡単に書けるので積極的に使っていきたいです。
(cotoutineのexampleには便利ライブラリが充実してるので標準ライブラリとして提供されると嬉しいです。)