Help us understand the problem. What is going on with this article?

Scala で WebSocket クライアント実装

More than 3 years have passed since last update.

Netty 4 の WebSocket ライブラリを使った Scala 実装サンプル。Java/Scala での WebSocket クライアント実装ってサーバ側と比べて情報が少ないですね。Netty 公式の WebSocket 実装例 を Scala で書き直したバージョンです。

Netty はイベントループのスレッドやデータ変換のパイプライン化など低レベルな操作をプログラマ自身が行えて気に入っているんですがその分コード量が多くなって泥臭いですね。毎度 ChannelHandler あたりはゴリゴリ実装しています。

build.sbt
organization := "org.koiroha"

name := "websocket-client"

version := "1.0.0-SNAPSHOT"

scalaVersion := "2.11.7"

scalacOptions ++= Seq("-deprecation","-feature","-unchecked","-Xlint","-Ywarn-dead-code","-Ywarn-numeric-widen","-Ywarn-unused","-Ywarn-unused-import")

libraryDependencies ++= Seq(
  "io.netty" % "netty-all" % "4.1.4.Final"
)
WebSocketClient.scala
// WebSocket Client Example for Scala 2.11 with Netty 4
// http://netty.io/4.0/xref/io/netty/example/http/websocketx/client/WebSocketClient.html
package org.koiroha.websocket

import io.netty.bootstrap.Bootstrap
import io.netty.buffer.Unpooled
import io.netty.channel.{Channel,ChannelFuture,ChannelHandlerContext,ChannelInitializer,ChannelPipeline,ChannelPromise,EventLoopGroup,SimpleChannelInboundHandler}
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.{DefaultHttpHeaders,FullHttpResponse,HttpClientCodec,HttpObjectAggregator}
import io.netty.handler.codec.http.websocketx.{CloseWebSocketFrame,PingWebSocketFrame,PongWebSocketFrame,TextWebSocketFrame,WebSocketClientHandshaker,WebSocketClientHandshakerFactory,WebSocketFrame,WebSocketVersion}
import io.netty.handler.ssl.{SslContext,SslContextBuilder}
import io.netty.handler.ssl.util.{InsecureTrustManagerFactory,SelfSignedCertificate}
import io.netty.util.CharsetUtil

import java.io.{BufferedReader,InputStreamReader}
import java.net.URI

import scala.annotation.tailrec

object WebSocketClient extends App {
  val WSURL = if(args.length == 0) "ws://echo.websocket.org" else args(0)

  val uri = new URI(WSURL)
  val scheme = Option(uri.getScheme).getOrElse("ws").toLowerCase
  val host = Option(uri.getHost).getOrElse("127.0.0.1")
  val port = if(uri.getPort < 0){
    scheme match {
      case "ws" => 80
      case "wss" => 443
      case _ => -1
    }
  } else uri.getPort

  if(scheme != "ws" && scheme != "wss"){
    System.err.println(s"ERROR: unsupported schema: $scheme")
    System.exit(1)
  }

  val secure = scheme == "wss"
  val sslContext = if(secure){
    Some(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build())
  } else None

  val group = new NioEventLoopGroup()
  try {
    val handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()))
    val bootstrap = new Bootstrap()
    bootstrap.group(group)
      .channel(classOf[NioSocketChannel])
      .handler(new ChannelInitializer[SocketChannel](){
        override def initChannel(ch:SocketChannel){
          val pipeline = ch.pipeline()
          sslContext.foreach{ s => pipeline.addLast(s.newHandler(ch.alloc(), host, port)) }
          pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler)
        }
      })

      val ch = bootstrap.connect(uri.getHost, port).sync().channel()
      handler.handshakeFuture.sync()

      val in = new BufferedReader(new InputStreamReader(System.in))
      @tailrec
      def _loop():Unit = {
        in.readLine() match {
          case null => ()
          case "quit" =>
            ch.writeAndFlush(new CloseWebSocketFrame())
            ch.closeFuture().sync()
          case "ping" =>
            ch.writeAndFlush(new PingWebSocketFrame(Unpooled.wrappedBuffer(Array[Byte](8,1,8,1))))
            _loop()
          case msg =>
            ch.writeAndFlush(new TextWebSocketFrame(msg))
            _loop()
        }
      }
      _loop()
  } finally {
    group.shutdownGracefully()
  }

}

private class WebSocketClientHandler(handshaker:WebSocketClientHandshaker) extends SimpleChannelInboundHandler[AnyRef] {
  var handshakeFuture:ChannelPromise = _
  override def handlerAdded(ctx:ChannelHandlerContext):Unit = handshakeFuture = ctx.newPromise()
  override def channelActive(ctx:ChannelHandlerContext):Unit = handshaker.handshake(ctx.channel())
  override def channelInactive(ctx:ChannelHandlerContext):Unit = System.out.println("WebSocket Client Disconnected")
  override def channelRead0(ctx:ChannelHandlerContext, msg:AnyRef):Unit = {
    val ch = ctx.channel()
    if(! handshaker.isHandshakeComplete){
      handshaker.finishHandshake(ch, msg.asInstanceOf[FullHttpResponse])
      System.out.println("WebSocket Client Connected")
      handshakeFuture.setSuccess()
    } else msg match {
      case res:FullHttpResponse =>
        throw new IllegalStateException(s"ERROR: Unexpected FullHttpResponse (status=${res.status.code}, content=${res.content().toString(CharsetUtil.UTF_8)})")
      case text:TextWebSocketFrame =>
        System.out.println(s"<< ${text.text()}")
      case pong:PongWebSocketFrame =>
        System.out.println("!! PONG")
      case ping:PingWebSocketFrame =>
        System.out.println("!! PING")
        ch.writeAndFlush(new PongWebSocketFrame(Unpooled.wrappedBuffer(Array[Byte](8,1,8,1))))
      case close:CloseWebSocketFrame =>
        System.out.println("WebSocket Client Received Closing")
        ch.close()
    }
  }
  override def exceptionCaught(ctx:ChannelHandlerContext, cause:Throwable):Unit = {
    cause.printStackTrace()
    if(! handshakeFuture.isDone()){
      handshakeFuture.setFailure(cause)
    }
    ctx.close()
  }
}
torao@github
Sr. Software Engineer. Distributed System, Blockchain, Machine Learning, NLP, Web with Scala/Java/C/C++/Go/Python/JS. TIPメモや時事ネタのようなもの置き場。リファイン型(Wiki スタイル)のアウトプットを取るので投稿後何度も修正します。
https://hazm.at/mox/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away