Edited at

finagleとthriftでmicro-servicesを作る

More than 3 years have passed since last update.

こんにちは、

今回は個人的に注目しているfinagleとthriftを使って簡単なmicro-servicesの作成を行うチュートリアルを紹介します。

finagleについてはseratchさんの翻訳、Scalaのmicro services事例はokapiesさんのエントリが詳しいと思います。


題材

日本語も検索できる全文検索サービスを作ります。POSTでデータを入れて、GETでヒットしたドキュメントを表示するといった簡単な仕様です。

twitterのscalaチュートリアルsearchbird というfinagleとthriftを使って検索エンジンを作るチュートリアルがありました。

searchbirdは更新されておらず動かない+古いということでもとに紹介したいと思います。

また、micro-servicesっぽい構成にするために無理矢理ですが、3つのサーバに分けました。


  • キャッシュ

  • 検索サーバ

  • DB

キャッシュはpathをキーにレスポンスをキャッシュします。ずっと持っててもしょうがないので10秒くらいしたら飛ばします。

検索サーバにはインデックスしか持たず、ドキュメントの本文はDBに持たせて、idを使ってDBから引くようにします。

また元のsearchbirdでは空白区切りの単語ごとの転置インデックスでしたが、日本語に対応したいのでバイグラムの転置インデックスにします。

各サーバへはThriftでつなぎます。


thrift定義

各サーバ間のインターフェイスをThrift で記述します。

まずキャッシュサーバのインターフェイスを定義します。

struct CacheResponse {

1 option string hit
2 string msg
}

service CacheService {
CacheResponse get(1: string key)

void put(1: string key, 2: string value)
}

次の節で説明しますが、これがScalaのコードに変換されます。

文字列をキーにして、一定時間文字列をキャッシュします。

getの返り値はOption[String]にしたいのですが、thriftではoption string get() のように定義できないため構造体を使います。

検索サーバのインターフェイスです。

service SearchService {

list<int> search(1: string key)

i32 put(1: string value)
}

見つかったドキュメントのidをリストにして返します。

このidはputした際にSearchServiceで決めてあげます。

DBのインターフェイスです。

service DbService {

string get(1: i32 key) (1: BaseException exp)

void put(1: i32 id, 2: string value)
}

ドキュメント本体はSearchで決めたidをキーにDBに格納します。こちらは変なIDでgetした場合は例外なのでExceptionで伝えます。

なお、Thriftの機能はこれ以外にもいろいろあります。

参考: http://thrift.apache.org/docs/idl

https://git-wip-us.apache.org/repos/asf?p=thrift.git;a=blob_plain;f=tutorial/tutorial.thrift


scrooge

Thrift は公式のScalaバインディングがないのでtwitterが作っているscroogeというツールを使います。

project/plugins.sbt

addSbtPlugin("com.twitter" %% "scrooge-sbt-plugin" % "3.16.3")

build.sbt

com.twitter.scrooge.ScroogeSBT.newSettings

thriftのコードをsrc/main/thrift に置くと、コンパイル時に同時にScalaのソースファイルがtarget/scala-2.10/src_managedに生成されます。

自動生成のソースは長いですが、重要なのはFutureのインターフェイス(FutureIface)です。

// キャッシュサーバ

object CacheService {
...
trait FutureIface extends CacheService[Future] {

def get(key: String): Future[CacheResponse]

def put(key: String, value: String): Future[Unit]
}
}

object CacheResponse {
def apply(
hit: Option[String] = None,
msg: String
): CacheResponse =
new Immutable(
hit,
msg
)
...
}

// 検索サーバ
object SearchService {
...
trait FutureIface extends SearchService[Future] {

def search(key: String): Future[Seq[Int]]

def put(value: String): Future[Int]

def delete(id: Int): Future[Unit]
}
...
}

// DB
object DbService {
...
trait FutureIface extends DbService[Future] {

def get(key: Int): Future[String]

def put(id: Int, value: String): Future[Unit]

def delete(id: Int): Future[Unit]
}
...
}

特に驚きはない感じでScalaのインターフェイスが生成されます。

これらserviceから実際のサーバとクライアントは以下のように作ります。

val cacheClient = Thrift.client.newIface[CacheService.FutureIface]("localhost:9090")

val cacheServer = Thrift.serveIface("localhost:9090", new CacheServerImpl())

serveIfaceの引数2は、XXXService.FutureIface型で自分で実装するserviceの実体を指定します。

なお、scroogeやfinagleのexampleには古い作り方で作って利しているので注意です(ドキュメントの日付に注意)。


各サーバの実装

長くなるので省略します。githubにソースを上げているのでそちらを参照してください。


twitter-server

次にHTTPのサーバ部を作ります。

twitter-server

引数処理、ロギングやヘルスチェック、トレース、プロファイルなど全部入りのライブラリで、サーバを簡単に作れるのでこれを使います。

機能 http://twitter.github.io/twitter-server/Features.html

twitter-serverやfinagle、scroogeに言えることですが、twitterのScalaライブラリは基本Scala 2.10まで、sbtも0.12までですので、

build.sbtやbuild.propertiesに明記する必要があります。

とりあえず、一番最初は管理用のHTTPサーバだけ動かします。

object HTTPServer extends TwitterServer {

def main() {

onExit {
adminHttpServer.close()
}

log.info("start admin:"+adminHttpServer.boundAddress)
Await.all(adminHttpServer,httpServer)
}
}

管理サーバのデフォルトのポートは9090で変更するには、引数にadmin.portを指定します。

$ sbt compile

$ sbt 'run-main org.github.iwag.HTTPServer -admin.port=:49090'
$ curl localhost:49990/admin
/admin/pprof/contention
/admin/metrics_graphs
/admin/pprof/profile
/admin/metrics.json
/admin/server_info
/admin/pprof/heap
/admin/contention
/abortabortabort
/admin/announcer
/admin/shutdown
/admin/clients/
/admin/logging
/admin/tracing
/admin/threads
/admin/metrics
/quitquitquit
/admin/files
/admin/ping
/admin/dtab
/health
/admin/
/admin

見ての通りいろいろな管理用のRestAPIがあります。

/health→OKとか/admin/pprof/profileでプロファイルがとれたり、コネクション数が見れたり、/admin/metrics.jsonでgcの回数とか値が取れたり、

/admin/shutdown で停止したりできます。

ただし metrics を見るにはbuild.sbtにfinagle-statsの追加が必要です。

"com.twitter"    %% "finagle-stats"         % "6.22.0",

$ curl localhost:49990/admin/metrics.json?pretty=true

{
"clnt/0.0.0.0:49092/available" : 2.0,
"clnt/0.0.0.0:49092/cancelled_connects" : 0,
"clnt/0.0.0.0:49092/closechans" : 0,
"clnt/0.0.0.0:49092/closed" : 0,
"clnt/0.0.0.0:49092/closes" : 0,
"clnt/0.0.0.0:49092/codec_connection_preparation_latency_ms.avg" : 0.0,
"clnt/0.0.0.0:49092/codec_connection_preparation_latency_ms.count" : 0,
"clnt/0.0.0.0:49092/codec_connection_preparation_latency_ms.max" : 0,
...
}

数えられないくらいのmetricsが出ます。

ちなみに、ヒープダンプを取る /admin/pprof/heap を有効にするには以下の記述が必要です(これは完全にドキュメントがなかった)。

"com.twitter"    %% "util-jvm"              % "6.22.0"

twittr-server全部便利な感じなんですがドキュメントが少ない感じでソースに当たらないとダメだしドキュメントもたいがい古いです。

twitter-serverで一番参考になったのはfinatraで、使い方がわからない場合finatraのソースを参考にするとよいです。


HTTPサーバ

とりあえず、GETリクエストに対して200 OKを返すようにします。

これはHTTPリクエストからHTTPレスポンスを返すサービスで、finagleでは以下のような型になります。

trait Service[HTTPRequest, HTTPResponse]

HTTPRequestHTTPResponseは Nettyのクラスです。が、Netty4向けの新しいhttpxではfinagle独自のRequest等が定義されるようです。

// HTTPサービスの実体、HTTPRequestからHTTPResponseを返す

class HTTPServerImpl() extends Service[HttpRequest, HttpResponse] {
override def apply(request: HttpRequest): Future[HttpResponse] = Future.value{
val response = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK)
response.setContent(copiedBuffer("OK", Utf8))
response
}
}

object HTTPServer extends TwitterServer {
// HTTPサーバにバインドするアドレス。引数から取れる
val httpAddr = flag("http", new InetSocketAddress(48080), "HTTP bind address")

def main() {
// エンドポイントに対してハンドラとしてサービスを割り当てる
val httpMux = new HttpMuxer().withHandler("/", new HTTPServerImpl())

// サーバを作成する
val httpServer = Http.serve(httpAddr(), httpMux)

onExit {
adminHttpServer.close()
httpServer.close()
}

log.info("start admin:" +adminHttpServer.boundAddress)
log.info("start http:"+ httpServer.boundAddress)
Await.all(adminHttpServer,httpServer)
}
}

$ sbt 'run-main org.github.iwag.HTTPServer -admin.port=:49090 -http=:48080'

$ curl localhost:48080/
OK%


POST

POSTして、キャッシュとDBに保存するところを作ります。

まず先ほどのHTTPサーバの実装を以下のように変更します。

class HTTPServerImpl() extends Service[HttpRequest, HttpResponse] {

override def apply(request: HttpRequest): Future[HttpResponse] = request.getMethod match {
case HttpMethod.GET => ??? // あとで
case HttpMethod.POST => post(request) // これを作る
case _ =>
Future.value(new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.BAD_REQUEST))
}
}

ここでもpostのとこだけサービスにして切り出します。

ScalaでよくあるFutureを使ったプログラミングです:Scala document

class PostService() extends Service[HttpRequest, HttpResponse] {

// DB用とSearch用のそれぞれのThriftのクライアントです
private[this] lazy val dbClient = Thrift.newIface[StoreService.FutureIface](dbAddr.getHostName+":"+dbAddr.getPort)
private[this] lazy val searchClient = Thrift.newIface[SearchService.FutureIface](searchAddr.getHostName+":"+searchAddr.getPort)

override def apply(request: HttpRequest): Future[HttpResponse] = {
val body = request.getContent.toString(Utf8)
val stored:Future[Unit] = searchClient.put(body) flatMap { i:Int => // SearchServiceにputで問い合わせて、flatMapで返ってきたidと
dbClient.put(i, body) // 文字列をdbServiceにputします
}
stored map { _ => // _:UnitだけどFutureで返したいのでmapのなかでResponseを作る
val response = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK)
response.setContent(copiedBuffer("OK", Utf8))
response
}
}
}


get

finagleはサービスやフィルタを組み合わせて新しいサービス(やフィルタ)を作ることができます。

get側はfinagleっらしくcache, searchのフィルタとdbのサービスを組み合わせて新しいサービスを構成します。

val get = cacheFilter andThen searchFilter andThen dbService


filter

フィルタはサービスの上に重ねることができ、Scalaの関数合成のように andThenでチェインさせることができます。

cacheのフィルタです。

ヒットしたらその結果を返す、しなかったら次のサービスに問い合わせる。サービスから返ってきたのをキャッシュに入れます。

// 引数にthriftのクライアントを渡す

class CacheFilter(cacheService: CacheService.FutureIface) extends SimpleFilter[HttpRequest, HttpResponse] {
// ↓フィルタの入力      ↓ 次のservice(またはfilter) ↓フィルタの出力
override def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]): Future[HttpResponse] = {
cacheService.get(request.getUri) flatMap { // cacheService のクライアントでgetを呼び出す。getはFutureで返ってくるのでflatMapを使う
_.hit match {
case Some(v) => { // キャッシュにヒットしたら、次のserviceを呼ばずに HttpResponseにつめて返す
val response = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK)
response.setContent(copiedBuffer(v, Utf8))
Future(response)
}
case _ => { // ヒットしなかったら次のserviceを呼びだす
service(request) flatMap { res =>
// 結果をキャッシュに書き戻す
cacheService.put(request.getUri, res.getContent.toString(Utf8)) map { i => res }
}
}
}
}
}
}

searchは、HTTPRequstを解釈して検索キーワードを取り出し、Searchのサービスに問い合わせます。

class SearchFilter(log:Logger, searchService: SearchService.FutureIface) extends  Filter[HttpRequest, HttpResponse, List[Int], List[String]] {

override def apply(request: HttpRequest, service: Service[List[Int], List[String]]): Future[HttpResponse] = {
log.info(request.toString + " " + request.getUri)

// まずHttpRequestからList[Int] を作って次のサービスに渡します
val decoded = java.net.URLDecoder.decode(request.getUri, "UTF-8")
val keyword = decoded.split("/")(1)

val flist:List[Int] = searchService.search(keyword) flatMap { res:Seq[Int] => // searchを呼び出す。futureで返ってくるのでflatMapで取り出す
log.info(keyword + " " + res.toString)
service(res.toList) // 次のサービスを呼び出す
}
// 次はList[String]からHttpResponseを作ります
flist map { l =>
val response = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK)
val str = l match {
case List() => response.setStatus(HttpResponseStatus.NOT_FOUND); "" // 空の場合Responseに404をセットして空文字列を返す
case list => list.map("\""+_+"\"").mkString (",") // ''"見つかった","見つかった","見つかった"' のような文字列を作る
}
response.setContent (copiedBuffer(s"[${str}]", Utf8) )
response
}
}
}

DBは、idのリストを受け取るのでidごとに問い合わせてcollectで集めて返します。

class StoreGetService(log:Logger, dbService: DbService.FutureIface) extends Service[List[Int], List[String]] {

override def apply(request: List[Int]): Future[List[String]] = {
val list :List[Future[String]] = request.map (storeService.get(_))
Future.collect(list) map (_.toList)
}
}

これらはHTTPサーバの中で次のように利用します。

class HTTPServerImpl(log: Logger, searchAddr: InetSocketAddress, cacheAddr: InetSocketAddress, dbAddr: InetSocketAddress) extends Service[HttpRequest, HttpResponse] {

// thriftの各クライアント
private[this] lazy val cacheClient = Thrift.newIface[CacheService.FutureIface](cacheAddr.getHostName+":"+cacheAddr.getPort) // newIfaceにはInetSocketAddrは受け取れず、"host:port"という文字列で渡す。"host:port"という形式はtoStringでも作れない。どうやって作るのが正解なんだろう
private[this] lazy val dbClient = Thrift.newIface[StoreService.FutureIface](dbAddr.getHostName+":"+dbAddr.getPort)
private[this] lazy val searchClient = Thrift.newIface[SearchService.FutureIface](searchAddr.getHostName+":"+searchAddr.getPort)

// キャッシュ
val cacheFilter = new CacheFilter(cacheClient)
// 検索
val searchFilter = new SearchFilter(log, searchClient)
// DB
val dbService = new StoreGetService(log, dbClient)
// getのサービス
val get = cacheFilter andThen searchFilter andThen dbService
...


組み込みのフィルタ

リトライやタイムアウトといったよく使う処理のフィルタはfinagleに組み込まれておりすでに作ったサービスと組み合わせることができます。

getでは2回リトライ、10秒でタイムアウトしたい場合、RetryingFilter、TimeoutFiltrを生成してgetの前にくっつけます。

  val retryFilter = new RetryingFilter[HttpRequest, HttpResponse](RetryPolicy.tries(2), DefaultTimer.twitter)

val timeoutFilter = new TimeoutFilter[HttpRequest, HttpResponse](10.seconds, DefaultTimer.twitter)

val post = timeoutFilter andThen retryFilter andThen
val get = timeoutFilter andThen retryFilter andThen cacheFilter andThen searchFilter andThen dbService


完成

最終的なHttpサーバのapplyメソッドは以下のようになります。

  override def apply(request: HttpRequest): Future[HttpResponse] = {

request.getMethod match {
case HttpMethod.GET => get(request)
case HttpMethod.POST => post(request)
case _ =>
Future.value(new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.BAD_REQUEST))
}
}


動かす

いっぱいサーバを動かすのでめんどいですが…。

キャッシュと検索サーバとDBを動かします。

thriftのポートはそれぞれ、49093、49092、49093を使うことにします。

$ sbt 'run-main com.github.iwag.StoreServer -admin.port=:49991 -bind=:49091'

$ sbt 'run-main com.github.iwag.SearchServer -admin.port=:49992 -bind=:49092'
$ sbt 'run-main com.github.iwag.CacheServer -admin.port=:49993 -bind=:49093'
$ sbt 'run-main com.github.iwag.HTTPServer -admin.port=:49990 -http=:48000 -search=:49092 -cache=:49093 -db=:49091'
$
$ curl -XPOST http://localhost:48000/ -d'hello world'
OK
$ curl -XPOST http://localhost:48000/ -d'世界世界'
OK
$ curl -XPOST http://localhost:48000/ -d'hello 世界'
OK
$ curl -XPOST http://localhost:48000/ -d'こんにちわ'
OK
$ curl -XPOST http://localhost:48000/ -d'こんにちわ世界'
OK
$ curl http://localhost:48000/hello
["hello 世界","hello hello"]
$ curl http://localhost:48000/%E4%B8%96%E7%95%8C
["hello 世界","こんにちわ世界","世界世界"]

できました。

なお、完全なソースコードはgithubにあります。


最後に

finagle流行ってくれ。