Posted at
ScalaDay 20

Finagleでサーバープログラミング

More than 3 years have passed since last update.

この記事はScala Advent Calendar 2014の20日目です。

よろしくお願いします。


はじめに

latencyが短く、かつ大規模アクセスにも耐えられるサーバー環境の構築が必要になる場合があります。

例えば、広告系のシステムだと入札/応札するようなサーバーでは100msとか50msとか以内にレスポンスを返さなければいけません。

このようなサーバーを書くために、Scalaではいくつか選択肢があるかと思います。

自分の会社では、その中でFinagleを使ってAdServerを書いています。

この記事ではFinagleの簡単な紹介と、Finagleがサーバーをどのように動かしているのかについて書きたいと思います。


Finagle

Finagleは、Twitterが開発したOSSのRPCシステムです(御存知の通り)。こちらにもあるように、RPCライブラリそのものではありません。

裏側にはNettyがラップされています(現在はNetty3)。

システム間の連携を非同期かつ様々なプロトコルを扱いつつ実現するという問題を、FinagleはFutureとServiceという抽象化によって解決しています(ちなみにFutureはtwitter.util.Futureです)。

abstract class Service[-Req, +Rep] extends (Req => Future[Rep]) 

ServiceはあるReq型のリクエストを受け取り、Futureに包んでRep型のResponseを返す、と。

Serviceが返したFutureをflatMap等で続けていくことで処理の流れを高階関数のように書くことができます。

def fetchUrl(url: String): Future[Array[Byte]]

def findImageUrls(bytes: Array[Byte]): Seq[String]

val imageUrls: Future[Seq[String]] = fetchUrl("http://www.example.com") flatMap { bytes =>
val images = findImageUrls(bytes)
Future(images)
}

(参照元: Concurrent Programming with Futures)

このようにサービスの入出力を決定し、つなげていくことで、システムをある入出力の型を持った関数の様に扱うことが可能になります(Your Server as a Function)。ロジックを途中でつなぎ変えたり、サービス単位でTimeOutを設定したり、ビジネスロジックの実装も簡単に行うことができます。まさに、型はドキュメントですね。


Finagleの中

Finagleは先に述べた抽象化により、独自のプロトコルを持ったサーバーを手軽に作ることができます。

どのような実装によって抽象化が実現されているのか、以下の簡単なEchoServerを例に見てみたいと思います。

(リポジトリはこちら)


SimpleEchoServer.scala


object Main extends App {

val service = new Service[String, String] {
def apply(request: String) = Future.value(request)
}
val server = SimpleEchoServer.serve(":8080", service)
Await.result(server)
}

object SimpleEchoServer {

case class Server(
stack: Stack[ServiceFactory[String, String]] = StackServer.newStack,
params: Stack.Params = StackServer.defaultParams
) extends StdStackServer[String, String, Server] {
protected type In = String
protected type Out = String

protected def copy1(
stack: Stack[ServiceFactory[String, String]] = this.stack,
params: Stack.Params = this.params
): Server = copy(stack, params)

protected def newListener(): Listener[String, String] =
Netty3Listener(StringServerPipeline, params)

protected def newDispatcher(
transport: Transport[String, String],
service: Service[String, String]
) = new SerialServerDispatcher(transport, service)
}

val server = new Server()

def serve(port: String, service: Service[String, String]): ListeningServer =
new Server().serve(port, service)
}

object StringServerPipeline extends ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("line", new DelimiterBasedFrameDecoder(100, Delimiters.lineDelimiter: _*))
pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8))
pipeline
}
}


登場人物としては以下のとおりです。



  • Service


    • リクエストを受け取ってレスポンスへと変換するロジック




  • Transport


    • OSIトランスポート層を表現。




  • Listener


    • portにたいしてlistenし、Transportへの接続要求を提供する役割をする




  • ChannelPipeLineFactory


    • Listener内でTransportへの入出力プロトコルを処理する




  • Dispatcher


    • TransportとServiceを結ぶコンテナとして使用

    • 1接続要求に対して1つ




  • ListeningServer


    • ListenerとDispatcherを結ぶ

    • Listenerが接続要求を受け取るたびにDispatherインスタンスを生成し、内部に持つjava.util.ConcurrentHashMapにconnectionを格納




データ処理部分

流れとしては、

  ChannelPipeLine -> Transport -> Service -> Transport -> ChannelPipeLine

となります。

まず、ChannelPipeLineは以下のようにListenerに仕込まれます。

protected def newListener(): Listener[String, String] =

Netty3Listener(StringServerPipeline, params)

Listener内では、pipelineの最後に、Transport => Serviceの処理を表すServerBridgeが追加されます。


com.twitter.finagele.netty3.Netty3Liestener

def newServerPipelineFactory(statsReceiver: StatsReceiver, newBridge: () => ChannelHandler) =

new ChannelPipelineFactory {
def getPipeline() = {
val pipeline = pipelineFactory.getPipeline()

~~

pipeline.addLast("finagleBridge", newBridge())
pipeline
}
}


TransportからServiceへの処理の流れはnewDispatcher(transport, service)でDispatcher内に渡されて...


com.twitter.finagle.StdStackServer


val underlying = listener.listen(addr) { transport =>
serviceFactory(newConn(transport)) respond {
case Return(service) =>
val d = server.newDispatcher(transport, service)
connections.add(d)
transport.onClose ensure connections.remove(d)
case Throw(_) => transport.close()
}
}

DispatcherではTransportからのデータを読んではサービスに渡して、さらにTransportにwriteするという処理を行っています。


com.twitter.finagle.dispatch.GenSerialServerDispatcher

private[this] def loop(): Future[Unit] = {

state.set(Idle)
trans.read() flatMap { req =>
val p = new Promise[Rep]
if (state.compareAndSet(Idle, p)) {
val eos = new Promise[Unit]
val save = Local.save()
try p.become(dispatch(req, eos))
finally Local.restore(save)
p map { res => (res, eos) }
} else Eof
} flatMap { case (rep, eos) =>
Future.join(handle(rep), eos).unit
} respond {
case Return(()) if state.get ne Closed =>
loop()

case _ =>
trans.close()
}
}



ネットワーク部分

Listenerでportに対してlistenし、接続要求が来たら並列で要求をキューイングしていきます。Transportの処理が返ってきたら、Dispatcherはキューからremoveします。


com.twitter.finagle.StdStackServer


val underlying = listener.listen(addr) { transport =>
serviceFactory(newConn(transport)) respond {
case Return(service) =>
val d = server.newDispatcher(transport, service)
connections.add(d)
transport.onClose ensure connections.remove(d)
case Throw(_) => transport.close()
}
}


まとめ

Finagleを使っていて、いまのところとてもいい感じです。


  • Futureのチェーン気持ちいい

  • Serviceでビジネスロジック実装しやすい

  • パフォーマンスも申し分ない

サーバー環境を新しく書こうとされている方の参考となれば幸いです。

ではみなさん、Happy Coding!!

--

補足) 先日Finagleがアップデートされ、Scala2.11対応されました。ただ、finagle-redisがまだなので心待ちにしています。