この記事は Kotlin Advent Calendar 15日目の記事です。
最近業務でgoでredisを使ってpub/subを実装していたのですが、kotlinだとどう書くんだろうと気になったので試してみました。
このあたりの記事を参考にしています。
- Spring MVC(+Spring Boot)上での非同期リクエストを理解する -後編(HTTP Streaming)- - Qiita
- Server-Sent Events (SSE) with Spring Boot | LinkedIn
redisのライブラリはこれを使いました。
GitHub - xetorthio/jedis: A blazingly small and sane redis java client
(pure kotlinなライブラリの決定版はまだなさそうでした)
まずはcontroller
@GetMapping(value = "/sse", produces = arrayOf("text/event-stream"))
@ResponseBody
fun sse() : SseEmitter {
return redisService.messageEmitter()
}
SprintBoot4にはSseEmitterというものがあるのでそれを返すだけです。content-typeは text/event-stream
にしてあげます。
なお、SpringBoot5ではFluxが使えるようです。
demo-spring-sse/SseController.java at master · aliakh/demo-spring-sse · GitHub
そしてRedisと接続するところ。
@Service
open class RedisService() {
private lateinit var redisClient: Jedis
private val chan = Channel<String>(2)
init {
redisClient = Jedis("localhost")
async(CommonPool) {
redisClient!!.subscribe(MyPubSub(chan), "hogeChannel")
}
}
fun messageEmitter() : SseEmitter {
val sse = SseEmitter()
ko{
for (m in chan) {
sse.send(m)
}
}
return sse
}
class MyPubSub : JedisPubSub {
private lateinit var chan : Channel<String>
constructor(ch: Channel<String>){
chan = ch
}
override fun onMessage(channel: String?, message: String?) {
println("channel : $channel, message : $message")
ko {
send(message!!)
}
}
suspend fun send(message: String){
println("send message")
chan.send(message)
}
}
}
サーバー起動時にredisでsubscribeしておきます。
subscribeはブロックするのでそのままだとサーバー起動に失敗するのでkotlinのコルーチンを使っています。
また、Jedisはsubscribe時にJedisPubSubというclassを継承したものを渡してあげる必要があるのでMyPubSubというクラスを作っています。
redisからメッセージを受け取ったときにemitterに渡すためにkotlin coroutine exampleにあるchannelを使っています。
kotlin-coroutines/examples/channel at master · Kotlin/kotlin-coroutines · GitHub
koという関数もexampleにあるgo.ktから持ってきてます。
// https://github.com/Kotlin/kotlin-coroutines/blob/master/examples/channel/go.kt
fun go(block: suspend () -> Unit) = CommonPool.runParallel(block)
これでサーバーを起動してcurlでアクセスしてからredisでpublishするとメッセージを受け取ることができます。r
curl -XGET http://localhost:8080/sse
data:hello
data:hello
data:hello
が、このままだと上手くいくのはコネクションが1つしかないときのみです。
for (m in chan) {
sse.send(m)
}
でchannelからメッセージを取り出してクライアントに返しているのですが、複数クライアントに同じメッセージを返したい場合はクライアントのコネクションを管理して全てのemitterにメッセージを送信する必要がありそうです。
まとめ
kotlin + SpringBootで割りと簡単にpub/subできました。
非同期処理もコルーチンで簡単に書けるので積極的に使っていきたいです。
(cotoutineのexampleには便利ライブラリが充実してるので標準ライブラリとして提供されると嬉しいです。)