KtorでWebsocketのテスト
Ktorで作成したwebsocketの機能をどのようにテストするのか最初わからなかったので、メモ。
バージョン
Ktor : 2.0.0
kotlin : 1.6.10
org.jetbrains.kotlin:kotlin-test-junit : 1.6.10
実装
websocketを以下のように実装した
fun Application.configureSockets() {
install(WebSockets) {
pingPeriod = Duration.ofSeconds(15)
timeout = Duration.ofSeconds(15)
maxFrameSize = Long.MAX_VALUE
masking = false
}
routing {
webSocket("/") { // websocketSession
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val text = frame.readText()
outgoing.send(Frame.Text("YOU SAID: $text"))
if (text.equals("bye", ignoreCase = true)) {
close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
}
}
}
}
}
}
}
テストコード
テストコードを以下のように書いた。
@Test
fun testWebsocket() = testApplication {
// クライアントのwebsocketを設定
val client = client.config {
install(WebSockets)
}
// テストコードやwebsocketの通信など実行
client.ws("/") {
val sendMessage = "connected"
outgoing.send(Frame.Text(sendMessage))
val receivedMessage = incoming.receive() as? Frame.Text ?: return@ws
assertEquals("YOU SAID: $sendMessage", receivedMessage.readText())
}
}