この記事はScala Advent Calendar 2014の20日目です。
よろしくお願いします。
はじめに
latencyが短く、かつ大規模アクセスにも耐えられるサーバー環境の構築が必要になる場合があります。
例えば、広告系のシステムだと入札/応札するようなサーバーでは100msとか50msとか以内にレスポンスを返さなければいけません。
このようなサーバーを書くために、Scalaではいくつか選択肢があるかと思います。
自分の会社では、その中でFinagleを使ってAdServerを書いています。
この記事ではFinagleの簡単な紹介と、Finagleがサーバーをどのように動かしているのかについて書きたいと思います。
Finagle
Finagleは、Twitterが開発したOSSのRPCシステムです(御存知の通り)。こちらにもあるように、RPCライブラリそのものではありません。
裏側にはNettyがラップされています(現在はNetty3)。
システム間の連携を非同期かつ様々なプロトコルを扱いつつ実現するという問題を、FinagleはFutureとServiceという抽象化によって解決しています(ちなみにFutureはtwitter.util.Future
です)。
abstract class Service[-Req, +Rep] extends (Req => Future[Rep])
ServiceはあるReq型のリクエストを受け取り、Futureに包んでRep型のResponseを返す、と。
Serviceが返したFutureをflatMap
等で続けていくことで処理の流れを高階関数のように書くことができます。
def fetchUrl(url: String): Future[Array[Byte]]
def findImageUrls(bytes: Array[Byte]): Seq[String]
val imageUrls: Future[Seq[String]] = fetchUrl("http://www.example.com") flatMap { bytes =>
val images = findImageUrls(bytes)
Future(images)
}
(参照元: Concurrent Programming with Futures)
このようにサービスの入出力を決定し、つなげていくことで、システムをある入出力の型を持った関数の様に扱うことが可能になります(Your Server as a Function)。ロジックを途中でつなぎ変えたり、サービス単位でTimeOutを設定したり、ビジネスロジックの実装も簡単に行うことができます。まさに、型はドキュメントですね。
Finagleの中
Finagleは先に述べた抽象化により、独自のプロトコルを持ったサーバーを手軽に作ることができます。
どのような実装によって抽象化が実現されているのか、以下の簡単なEchoServerを例に見てみたいと思います。
(リポジトリはこちら)
object Main extends App {
val service = new Service[String, String] {
def apply(request: String) = Future.value(request)
}
val server = SimpleEchoServer.serve(":8080", service)
Await.result(server)
}
object SimpleEchoServer {
case class Server(
stack: Stack[ServiceFactory[String, String]] = StackServer.newStack,
params: Stack.Params = StackServer.defaultParams
) extends StdStackServer[String, String, Server] {
protected type In = String
protected type Out = String
protected def copy1(
stack: Stack[ServiceFactory[String, String]] = this.stack,
params: Stack.Params = this.params
): Server = copy(stack, params)
protected def newListener(): Listener[String, String] =
Netty3Listener(StringServerPipeline, params)
protected def newDispatcher(
transport: Transport[String, String],
service: Service[String, String]
) = new SerialServerDispatcher(transport, service)
}
val server = new Server()
def serve(port: String, service: Service[String, String]): ListeningServer =
new Server().serve(port, service)
}
object StringServerPipeline extends ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("line", new DelimiterBasedFrameDecoder(100, Delimiters.lineDelimiter: _*))
pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8))
pipeline
}
}
登場人物としては以下のとおりです。
-
Service
- リクエストを受け取ってレスポンスへと変換するロジック
-
Transport
- OSIトランスポート層を表現。
-
Listener
- portにたいしてlistenし、Transportへの接続要求を提供する役割をする
-
ChannelPipeLineFactory
- Listener内でTransportへの入出力プロトコルを処理する
-
Dispatcher
- TransportとServiceを結ぶコンテナとして使用
- 1接続要求に対して1つ
-
ListeningServer
- ListenerとDispatcherを結ぶ
- Listenerが接続要求を受け取るたびにDispatherインスタンスを生成し、内部に持つ
java.util.ConcurrentHashMap
にconnectionを格納
データ処理部分
流れとしては、
ChannelPipeLine -> Transport -> Service -> Transport -> ChannelPipeLine
となります。
まず、ChannelPipeLineは以下のようにListenerに仕込まれます。
protected def newListener(): Listener[String, String] =
Netty3Listener(StringServerPipeline, params)
Listener内では、pipelineの最後に、Transport => Service
の処理を表すServerBridgeが追加されます。
def newServerPipelineFactory(statsReceiver: StatsReceiver, newBridge: () => ChannelHandler) =
new ChannelPipelineFactory {
def getPipeline() = {
val pipeline = pipelineFactory.getPipeline()
~~
pipeline.addLast("finagleBridge", newBridge())
pipeline
}
}
TransportからServiceへの処理の流れはnewDispatcher(transport, service)
でDispatcher内に渡されて...
val underlying = listener.listen(addr) { transport =>
serviceFactory(newConn(transport)) respond {
case Return(service) =>
val d = server.newDispatcher(transport, service)
connections.add(d)
transport.onClose ensure connections.remove(d)
case Throw(_) => transport.close()
}
}
DispatcherではTransportからのデータを読んではサービスに渡して、さらにTransportにwriteするという処理を行っています。
private[this] def loop(): Future[Unit] = {
state.set(Idle)
trans.read() flatMap { req =>
val p = new Promise[Rep]
if (state.compareAndSet(Idle, p)) {
val eos = new Promise[Unit]
val save = Local.save()
try p.become(dispatch(req, eos))
finally Local.restore(save)
p map { res => (res, eos) }
} else Eof
} flatMap { case (rep, eos) =>
Future.join(handle(rep), eos).unit
} respond {
case Return(()) if state.get ne Closed =>
loop()
case _ =>
trans.close()
}
}
ネットワーク部分
Listenerでportに対してlisten
し、接続要求が来たら並列で要求をキューイングしていきます。Transportの処理が返ってきたら、Dispatcherはキューからremoveします。
val underlying = listener.listen(addr) { transport =>
serviceFactory(newConn(transport)) respond {
case Return(service) =>
val d = server.newDispatcher(transport, service)
connections.add(d)
transport.onClose ensure connections.remove(d)
case Throw(_) => transport.close()
}
}
まとめ
Finagleを使っていて、いまのところとてもいい感じです。
- Futureのチェーン気持ちいい
- Serviceでビジネスロジック実装しやすい
- パフォーマンスも申し分ない
サーバー環境を新しく書こうとされている方の参考となれば幸いです。
ではみなさん、Happy Coding!!
--
補足) 先日Finagleがアップデートされ、Scala2.11対応されました。ただ、finagle-redisがまだなので心待ちにしています。