LoginSignup
4
5

More than 5 years have passed since last update.

Scala で WebSocket クライアント実装

Posted at

Netty 4 の WebSocket ライブラリを使った Scala 実装サンプル。Java/Scala での WebSocket クライアント実装ってサーバ側と比べて情報が少ないですね。Netty 公式の WebSocket 実装例 を Scala で書き直したバージョンです。

Netty はイベントループのスレッドやデータ変換のパイプライン化など低レベルな操作をプログラマ自身が行えて気に入っているんですがその分コード量が多くなって泥臭いですね。毎度 ChannelHandler あたりはゴリゴリ実装しています。

build.sbt
organization := "org.koiroha"

name := "websocket-client"

version := "1.0.0-SNAPSHOT"

scalaVersion := "2.11.7"

scalacOptions ++= Seq("-deprecation","-feature","-unchecked","-Xlint","-Ywarn-dead-code","-Ywarn-numeric-widen","-Ywarn-unused","-Ywarn-unused-import")

libraryDependencies ++= Seq(
  "io.netty" % "netty-all" % "4.1.4.Final"
)
WebSocketClient.scala
// WebSocket Client Example for Scala 2.11 with Netty 4
// http://netty.io/4.0/xref/io/netty/example/http/websocketx/client/WebSocketClient.html
package org.koiroha.websocket

import io.netty.bootstrap.Bootstrap
import io.netty.buffer.Unpooled
import io.netty.channel.{Channel,ChannelFuture,ChannelHandlerContext,ChannelInitializer,ChannelPipeline,ChannelPromise,EventLoopGroup,SimpleChannelInboundHandler}
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.{DefaultHttpHeaders,FullHttpResponse,HttpClientCodec,HttpObjectAggregator}
import io.netty.handler.codec.http.websocketx.{CloseWebSocketFrame,PingWebSocketFrame,PongWebSocketFrame,TextWebSocketFrame,WebSocketClientHandshaker,WebSocketClientHandshakerFactory,WebSocketFrame,WebSocketVersion}
import io.netty.handler.ssl.{SslContext,SslContextBuilder}
import io.netty.handler.ssl.util.{InsecureTrustManagerFactory,SelfSignedCertificate}
import io.netty.util.CharsetUtil

import java.io.{BufferedReader,InputStreamReader}
import java.net.URI

import scala.annotation.tailrec

object WebSocketClient extends App {
  val WSURL = if(args.length == 0) "ws://echo.websocket.org" else args(0)

  val uri = new URI(WSURL)
  val scheme = Option(uri.getScheme).getOrElse("ws").toLowerCase
  val host = Option(uri.getHost).getOrElse("127.0.0.1")
  val port = if(uri.getPort < 0){
    scheme match {
      case "ws" => 80
      case "wss" => 443
      case _ => -1
    }
  } else uri.getPort

  if(scheme != "ws" && scheme != "wss"){
    System.err.println(s"ERROR: unsupported schema: $scheme")
    System.exit(1)
  }

  val secure = scheme == "wss"
  val sslContext = if(secure){
    Some(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build())
  } else None

  val group = new NioEventLoopGroup()
  try {
    val handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()))
    val bootstrap = new Bootstrap()
    bootstrap.group(group)
      .channel(classOf[NioSocketChannel])
      .handler(new ChannelInitializer[SocketChannel](){
        override def initChannel(ch:SocketChannel){
          val pipeline = ch.pipeline()
          sslContext.foreach{ s => pipeline.addLast(s.newHandler(ch.alloc(), host, port)) }
          pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler)
        }
      })

      val ch = bootstrap.connect(uri.getHost, port).sync().channel()
      handler.handshakeFuture.sync()

      val in = new BufferedReader(new InputStreamReader(System.in))
      @tailrec
      def _loop():Unit = {
        in.readLine() match {
          case null => ()
          case "quit" =>
            ch.writeAndFlush(new CloseWebSocketFrame())
            ch.closeFuture().sync()
          case "ping" =>
            ch.writeAndFlush(new PingWebSocketFrame(Unpooled.wrappedBuffer(Array[Byte](8,1,8,1))))
            _loop()
          case msg =>
            ch.writeAndFlush(new TextWebSocketFrame(msg))
            _loop()
        }
      }
      _loop()
  } finally {
    group.shutdownGracefully()
  }

}

private class WebSocketClientHandler(handshaker:WebSocketClientHandshaker) extends SimpleChannelInboundHandler[AnyRef] {
  var handshakeFuture:ChannelPromise = _
  override def handlerAdded(ctx:ChannelHandlerContext):Unit = handshakeFuture = ctx.newPromise()
  override def channelActive(ctx:ChannelHandlerContext):Unit = handshaker.handshake(ctx.channel())
  override def channelInactive(ctx:ChannelHandlerContext):Unit = System.out.println("WebSocket Client Disconnected")
  override def channelRead0(ctx:ChannelHandlerContext, msg:AnyRef):Unit = {
    val ch = ctx.channel()
    if(! handshaker.isHandshakeComplete){
      handshaker.finishHandshake(ch, msg.asInstanceOf[FullHttpResponse])
      System.out.println("WebSocket Client Connected")
      handshakeFuture.setSuccess()
    } else msg match {
      case res:FullHttpResponse =>
        throw new IllegalStateException(s"ERROR: Unexpected FullHttpResponse (status=${res.status.code}, content=${res.content().toString(CharsetUtil.UTF_8)})")
      case text:TextWebSocketFrame =>
        System.out.println(s"<< ${text.text()}")
      case pong:PongWebSocketFrame =>
        System.out.println("!! PONG")
      case ping:PingWebSocketFrame =>
        System.out.println("!! PING")
        ch.writeAndFlush(new PongWebSocketFrame(Unpooled.wrappedBuffer(Array[Byte](8,1,8,1))))
      case close:CloseWebSocketFrame =>
        System.out.println("WebSocket Client Received Closing")
        ch.close()
    }
  }
  override def exceptionCaught(ctx:ChannelHandlerContext, cause:Throwable):Unit = {
    cause.printStackTrace()
    if(! handshakeFuture.isDone()){
      handshakeFuture.setFailure(cause)
    }
    ctx.close()
  }
}
4
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
5