Posted at

Scala で WebSocket クライアント実装

More than 3 years have passed since last update.

Netty 4 の WebSocket ライブラリを使った Scala 実装サンプル。Java/Scala での WebSocket クライアント実装ってサーバ側と比べて情報が少ないですね。Netty 公式の WebSocket 実装例 を Scala で書き直したバージョンです。

Netty はイベントループのスレッドやデータ変換のパイプライン化など低レベルな操作をプログラマ自身が行えて気に入っているんですがその分コード量が多くなって泥臭いですね。毎度 ChannelHandler あたりはゴリゴリ実装しています。


build.sbt

organization := "org.koiroha"

name := "websocket-client"

version := "1.0.0-SNAPSHOT"

scalaVersion := "2.11.7"

scalacOptions ++= Seq("-deprecation","-feature","-unchecked","-Xlint","-Ywarn-dead-code","-Ywarn-numeric-widen","-Ywarn-unused","-Ywarn-unused-import")

libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.1.4.Final"
)



WebSocketClient.scala

// WebSocket Client Example for Scala 2.11 with Netty 4

// http://netty.io/4.0/xref/io/netty/example/http/websocketx/client/WebSocketClient.html
package org.koiroha.websocket

import io.netty.bootstrap.Bootstrap
import io.netty.buffer.Unpooled
import io.netty.channel.{Channel,ChannelFuture,ChannelHandlerContext,ChannelInitializer,ChannelPipeline,ChannelPromise,EventLoopGroup,SimpleChannelInboundHandler}
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.{DefaultHttpHeaders,FullHttpResponse,HttpClientCodec,HttpObjectAggregator}
import io.netty.handler.codec.http.websocketx.{CloseWebSocketFrame,PingWebSocketFrame,PongWebSocketFrame,TextWebSocketFrame,WebSocketClientHandshaker,WebSocketClientHandshakerFactory,WebSocketFrame,WebSocketVersion}
import io.netty.handler.ssl.{SslContext,SslContextBuilder}
import io.netty.handler.ssl.util.{InsecureTrustManagerFactory,SelfSignedCertificate}
import io.netty.util.CharsetUtil

import java.io.{BufferedReader,InputStreamReader}
import java.net.URI

import scala.annotation.tailrec

object WebSocketClient extends App {
val WSURL = if(args.length == 0) "ws://echo.websocket.org" else args(0)

val uri = new URI(WSURL)
val scheme = Option(uri.getScheme).getOrElse("ws").toLowerCase
val host = Option(uri.getHost).getOrElse("127.0.0.1")
val port = if(uri.getPort < 0){
scheme match {
case "ws" => 80
case "wss" => 443
case _ => -1
}
} else uri.getPort

if(scheme != "ws" && scheme != "wss"){
System.err.println(s"ERROR: unsupported schema: $scheme")
System.exit(1)
}

val secure = scheme == "wss"
val sslContext = if(secure){
Some(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build())
} else None

val group = new NioEventLoopGroup()
try {
val handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()))
val bootstrap = new Bootstrap()
bootstrap.group(group)
.channel(classOf[NioSocketChannel])
.handler(new ChannelInitializer[SocketChannel](){
override def initChannel(ch:SocketChannel){
val pipeline = ch.pipeline()
sslContext.foreach{ s => pipeline.addLast(s.newHandler(ch.alloc(), host, port)) }
pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler)
}
})

val ch = bootstrap.connect(uri.getHost, port).sync().channel()
handler.handshakeFuture.sync()

val in = new BufferedReader(new InputStreamReader(System.in))
@tailrec
def _loop():Unit = {
in.readLine() match {
case null => ()
case "quit" =>
ch.writeAndFlush(new CloseWebSocketFrame())
ch.closeFuture().sync()
case "ping" =>
ch.writeAndFlush(new PingWebSocketFrame(Unpooled.wrappedBuffer(Array[Byte](8,1,8,1))))
_loop()
case msg =>
ch.writeAndFlush(new TextWebSocketFrame(msg))
_loop()
}
}
_loop()
} finally {
group.shutdownGracefully()
}

}

private class WebSocketClientHandler(handshaker:WebSocketClientHandshaker) extends SimpleChannelInboundHandler[AnyRef] {
var handshakeFuture:ChannelPromise = _
override def handlerAdded(ctx:ChannelHandlerContext):Unit = handshakeFuture = ctx.newPromise()
override def channelActive(ctx:ChannelHandlerContext):Unit = handshaker.handshake(ctx.channel())
override def channelInactive(ctx:ChannelHandlerContext):Unit = System.out.println("WebSocket Client Disconnected")
override def channelRead0(ctx:ChannelHandlerContext, msg:AnyRef):Unit = {
val ch = ctx.channel()
if(! handshaker.isHandshakeComplete){
handshaker.finishHandshake(ch, msg.asInstanceOf[FullHttpResponse])
System.out.println("WebSocket Client Connected")
handshakeFuture.setSuccess()
} else msg match {
case res:FullHttpResponse =>
throw new IllegalStateException(s"ERROR: Unexpected FullHttpResponse (status=${res.status.code}, content=${res.content().toString(CharsetUtil.UTF_8)})")
case text:TextWebSocketFrame =>
System.out.println(s"<< ${text.text()}")
case pong:PongWebSocketFrame =>
System.out.println("!! PONG")
case ping:PingWebSocketFrame =>
System.out.println("!! PING")
ch.writeAndFlush(new PongWebSocketFrame(Unpooled.wrappedBuffer(Array[Byte](8,1,8,1))))
case close:CloseWebSocketFrame =>
System.out.println("WebSocket Client Received Closing")
ch.close()
}
}
override def exceptionCaught(ctx:ChannelHandlerContext, cause:Throwable):Unit = {
cause.printStackTrace()
if(! handshakeFuture.isDone()){
handshakeFuture.setFailure(cause)
}
ctx.close()
}
}