Help us understand the problem. What is going on with this article?

finagleとthriftでmicro-servicesを作る

More than 5 years have passed since last update.

こんにちは、
今回は個人的に注目しているfinagleとthriftを使って簡単なmicro-servicesの作成を行うチュートリアルを紹介します。
finagleについてはseratchさんの翻訳、Scalaのmicro services事例はokapiesさんのエントリが詳しいと思います。

題材

日本語も検索できる全文検索サービスを作ります。POSTでデータを入れて、GETでヒットしたドキュメントを表示するといった簡単な仕様です。

twitterのscalaチュートリアルsearchbird というfinagleとthriftを使って検索エンジンを作るチュートリアルがありました。
searchbirdは更新されておらず動かない+古いということでもとに紹介したいと思います。

また、micro-servicesっぽい構成にするために無理矢理ですが、3つのサーバに分けました。

  • キャッシュ
  • 検索サーバ
  • DB

image

キャッシュはpathをキーにレスポンスをキャッシュします。ずっと持っててもしょうがないので10秒くらいしたら飛ばします。
検索サーバにはインデックスしか持たず、ドキュメントの本文はDBに持たせて、idを使ってDBから引くようにします。
また元のsearchbirdでは空白区切りの単語ごとの転置インデックスでしたが、日本語に対応したいのでバイグラムの転置インデックスにします。
各サーバへはThriftでつなぎます。

thrift定義

各サーバ間のインターフェイスをThrift で記述します。

まずキャッシュサーバのインターフェイスを定義します。

struct CacheResponse {
1 option string hit
2 string msg
}

service CacheService {
  CacheResponse get(1: string key)

  void put(1: string key, 2: string value)
}

次の節で説明しますが、これがScalaのコードに変換されます。
文字列をキーにして、一定時間文字列をキャッシュします。
getの返り値はOption[String]にしたいのですが、thriftではoption string get() のように定義できないため構造体を使います。

検索サーバのインターフェイスです。

service SearchService {
  list<int> search(1: string key)

  i32 put(1: string value)
}

見つかったドキュメントのidをリストにして返します。
このidはputした際にSearchServiceで決めてあげます。

DBのインターフェイスです。

service DbService {
  string get(1: i32 key) (1: BaseException exp)

  void put(1: i32 id, 2: string value)
}

ドキュメント本体はSearchで決めたidをキーにDBに格納します。こちらは変なIDでgetした場合は例外なのでExceptionで伝えます。

なお、Thriftの機能はこれ以外にもいろいろあります。
参考: http://thrift.apache.org/docs/idl
https://git-wip-us.apache.org/repos/asf?p=thrift.git;a=blob_plain;f=tutorial/tutorial.thrift

scrooge

Thrift は公式のScalaバインディングがないのでtwitterが作っているscroogeというツールを使います。

project/plugins.sbt
addSbtPlugin("com.twitter" %% "scrooge-sbt-plugin" % "3.16.3")
build.sbt
com.twitter.scrooge.ScroogeSBT.newSettings

thriftのコードをsrc/main/thrift に置くと、コンパイル時に同時にScalaのソースファイルがtarget/scala-2.10/src_managedに生成されます。
自動生成のソースは長いですが、重要なのはFutureのインターフェイス(FutureIface)です。

// キャッシュサーバ
object CacheService {
  ...
  trait FutureIface extends  CacheService[Future] {

    def get(key: String): Future[CacheResponse]

    def put(key: String, value: String): Future[Unit]
  }
}

object CacheResponse {
 def apply(
     hit: Option[String] = None,
     msg: String
   ): CacheResponse =
     new Immutable(
       hit,
       msg
   )   
   ...
}


// 検索サーバ
object SearchService {
  ...
  trait FutureIface extends  SearchService[Future] {

    def search(key: String): Future[Seq[Int]]

    def put(value: String): Future[Int]

    def delete(id: Int): Future[Unit]
  }
  ...
}


// DB
object DbService {
  ...
  trait FutureIface extends  DbService[Future] {

    def get(key: Int): Future[String]

    def put(id: Int, value: String): Future[Unit]

    def delete(id: Int): Future[Unit]
  }
  ...
}

特に驚きはない感じでScalaのインターフェイスが生成されます。

これらserviceから実際のサーバとクライアントは以下のように作ります。

val cacheClient = Thrift.client.newIface[CacheService.FutureIface]("localhost:9090")
val cacheServer = Thrift.serveIface("localhost:9090", new CacheServerImpl())

serveIfaceの引数2は、XXXService.FutureIface型で自分で実装するserviceの実体を指定します。

なお、scroogeやfinagleのexampleには古い作り方で作って利しているので注意です(ドキュメントの日付に注意)。

各サーバの実装

長くなるので省略します。githubにソースを上げているのでそちらを参照してください。

twitter-server

次にHTTPのサーバ部を作ります。
twitter-server
引数処理、ロギングやヘルスチェック、トレース、プロファイルなど全部入りのライブラリで、サーバを簡単に作れるのでこれを使います。
機能 http://twitter.github.io/twitter-server/Features.html

twitter-serverやfinagle、scroogeに言えることですが、twitterのScalaライブラリは基本Scala 2.10まで、sbtも0.12までですので、
build.sbtやbuild.propertiesに明記する必要があります。

とりあえず、一番最初は管理用のHTTPサーバだけ動かします。

object HTTPServer extends TwitterServer {
  def main() {

    onExit {
      adminHttpServer.close()
    }

    log.info("start admin:"+adminHttpServer.boundAddress)
    Await.all(adminHttpServer,httpServer)
  }
}

管理サーバのデフォルトのポートは9090で変更するには、引数にadmin.portを指定します。

$ sbt compile
$ sbt 'run-main org.github.iwag.HTTPServer -admin.port=:49090'
$ curl localhost:49990/admin
/admin/pprof/contention
/admin/metrics_graphs
/admin/pprof/profile
/admin/metrics.json
/admin/server_info
/admin/pprof/heap
/admin/contention
/abortabortabort
/admin/announcer
/admin/shutdown
/admin/clients/
/admin/logging
/admin/tracing
/admin/threads
/admin/metrics
/quitquitquit
/admin/files
/admin/ping
/admin/dtab
/health
/admin/
/admin

見ての通りいろいろな管理用のRestAPIがあります。
/health→OKとか/admin/pprof/profileでプロファイルがとれたり、コネクション数が見れたり、/admin/metrics.jsonでgcの回数とか値が取れたり、
/admin/shutdown で停止したりできます。

ただし metrics を見るにはbuild.sbtにfinagle-statsの追加が必要です。

"com.twitter"    %% "finagle-stats"         % "6.22.0",
$ curl localhost:49990/admin/metrics.json?pretty=true
{
  "clnt/0.0.0.0:49092/available" : 2.0,
  "clnt/0.0.0.0:49092/cancelled_connects" : 0,
  "clnt/0.0.0.0:49092/closechans" : 0,
  "clnt/0.0.0.0:49092/closed" : 0,
  "clnt/0.0.0.0:49092/closes" : 0,
  "clnt/0.0.0.0:49092/codec_connection_preparation_latency_ms.avg" : 0.0,
  "clnt/0.0.0.0:49092/codec_connection_preparation_latency_ms.count" : 0,
  "clnt/0.0.0.0:49092/codec_connection_preparation_latency_ms.max" : 0,
  ...
}

数えられないくらいのmetricsが出ます。

ちなみに、ヒープダンプを取る /admin/pprof/heap を有効にするには以下の記述が必要です(これは完全にドキュメントがなかった)。

"com.twitter"    %% "util-jvm"              % "6.22.0"

twittr-server全部便利な感じなんですがドキュメントが少ない感じでソースに当たらないとダメだしドキュメントもたいがい古いです。
twitter-serverで一番参考になったのはfinatraで、使い方がわからない場合finatraのソースを参考にするとよいです。

HTTPサーバ

とりあえず、GETリクエストに対して200 OKを返すようにします。

これはHTTPリクエストからHTTPレスポンスを返すサービスで、finagleでは以下のような型になります。

trait Service[HTTPRequest, HTTPResponse]

HTTPRequestHTTPResponseは Nettyのクラスです。が、Netty4向けの新しいhttpxではfinagle独自のRequest等が定義されるようです。

// HTTPサービスの実体、HTTPRequestからHTTPResponseを返す
class HTTPServerImpl() extends Service[HttpRequest, HttpResponse] {
  override def apply(request: HttpRequest): Future[HttpResponse] = Future.value{
      val response = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK)
      response.setContent(copiedBuffer("OK", Utf8))
      response
  }
}

object HTTPServer extends TwitterServer {
  // HTTPサーバにバインドするアドレス。引数から取れる
  val httpAddr = flag("http", new InetSocketAddress(48080), "HTTP bind address")

  def main() {
    // エンドポイントに対してハンドラとしてサービスを割り当てる
    val httpMux = new HttpMuxer().withHandler("/", new HTTPServerImpl())

    // サーバを作成する
    val httpServer = Http.serve(httpAddr(), httpMux)

    onExit {
      adminHttpServer.close()
      httpServer.close()
    }

    log.info("start admin:" +adminHttpServer.boundAddress)
    log.info("start http:"+ httpServer.boundAddress)
    Await.all(adminHttpServer,httpServer)
  }
}
$ sbt 'run-main org.github.iwag.HTTPServer -admin.port=:49090 -http=:48080'
$ curl localhost:48080/
OK%

POST

POSTして、キャッシュとDBに保存するところを作ります。

まず先ほどのHTTPサーバの実装を以下のように変更します。

class HTTPServerImpl() extends Service[HttpRequest, HttpResponse] {
  override def apply(request: HttpRequest): Future[HttpResponse] = request.getMethod match {
    case HttpMethod.GET => ??? // あとで
    case HttpMethod.POST => post(request) // これを作る
    case _ =>
      Future.value(new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.BAD_REQUEST))
  }
}

ここでもpostのとこだけサービスにして切り出します。
ScalaでよくあるFutureを使ったプログラミングです:Scala document

class PostService() extends Service[HttpRequest, HttpResponse] {
  // DB用とSearch用のそれぞれのThriftのクライアントです
  private[this] lazy val dbClient = Thrift.newIface[StoreService.FutureIface](dbAddr.getHostName+":"+dbAddr.getPort)
  private[this] lazy val searchClient = Thrift.newIface[SearchService.FutureIface](searchAddr.getHostName+":"+searchAddr.getPort)

  override def apply(request: HttpRequest): Future[HttpResponse] = {
    val body = request.getContent.toString(Utf8)
    val stored:Future[Unit] = searchClient.put(body) flatMap { i:Int => // SearchServiceにputで問い合わせて、flatMapで返ってきたidと
      dbClient.put(i, body) // 文字列をdbServiceにputします
    }
    stored map { _ => // _:UnitだけどFutureで返したいのでmapのなかでResponseを作る
      val response = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK)
      response.setContent(copiedBuffer("OK", Utf8))
      response
    }
  }
}

get

finagleはサービスやフィルタを組み合わせて新しいサービス(やフィルタ)を作ることができます。

get側はfinagleっらしくcache, searchのフィルタとdbのサービスを組み合わせて新しいサービスを構成します。

val get = cacheFilter andThen searchFilter andThen dbService

filter

フィルタはサービスの上に重ねることができ、Scalaの関数合成のように andThenでチェインさせることができます。

cacheのフィルタです。
ヒットしたらその結果を返す、しなかったら次のサービスに問い合わせる。サービスから返ってきたのをキャッシュに入れます。

// 引数にthriftのクライアントを渡す
class CacheFilter(cacheService: CacheService.FutureIface) extends SimpleFilter[HttpRequest, HttpResponse] {
  //                 ↓フィルタの入力      ↓ 次のservice(またはfilter)                      ↓フィルタの出力
  override def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]): Future[HttpResponse] = {
    cacheService.get(request.getUri) flatMap { // cacheService のクライアントでgetを呼び出す。getはFutureで返ってくるのでflatMapを使う
      _.hit match {
        case Some(v) => { // キャッシュにヒットしたら、次のserviceを呼ばずに HttpResponseにつめて返す
          val response = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK)
          response.setContent(copiedBuffer(v, Utf8))
          Future(response)
        }
        case _ => { // ヒットしなかったら次のserviceを呼びだす
          service(request) flatMap { res =>
            // 結果をキャッシュに書き戻す
            cacheService.put(request.getUri, res.getContent.toString(Utf8)) map { i => res }
          }
        }
      }
    }
  }
}

searchは、HTTPRequstを解釈して検索キーワードを取り出し、Searchのサービスに問い合わせます。

class SearchFilter(log:Logger, searchService: SearchService.FutureIface) extends  Filter[HttpRequest, HttpResponse, List[Int], List[String]] {
  override def apply(request: HttpRequest, service: Service[List[Int], List[String]]): Future[HttpResponse] = {
    log.info(request.toString + " " + request.getUri)

    // まずHttpRequestからList[Int] を作って次のサービスに渡します
    val decoded = java.net.URLDecoder.decode(request.getUri, "UTF-8")
    val keyword = decoded.split("/")(1)

    val flist:List[Int] = searchService.search(keyword) flatMap { res:Seq[Int] => // searchを呼び出す。futureで返ってくるのでflatMapで取り出す
      log.info(keyword + " " + res.toString)
      service(res.toList) // 次のサービスを呼び出す
    }
    // 次はList[String]からHttpResponseを作ります
    flist map { l =>
      val response = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK)
      val str = l match {
        case List() => response.setStatus(HttpResponseStatus.NOT_FOUND); "" // 空の場合Responseに404をセットして空文字列を返す
        case list => list.map("\""+_+"\"").mkString (",") // ''"見つかった","見つかった","見つかった"' のような文字列を作る
      }
      response.setContent (copiedBuffer(s"[${str}]", Utf8) )
      response
    }
  }
}

DBは、idのリストを受け取るのでidごとに問い合わせてcollectで集めて返します。

class StoreGetService(log:Logger, dbService: DbService.FutureIface) extends Service[List[Int], List[String]] {
  override def apply(request: List[Int]): Future[List[String]] = {
    val list :List[Future[String]] = request.map (storeService.get(_))
    Future.collect(list) map (_.toList)
  }
}

これらはHTTPサーバの中で次のように利用します。

class HTTPServerImpl(log: Logger, searchAddr: InetSocketAddress, cacheAddr: InetSocketAddress, dbAddr: InetSocketAddress) extends Service[HttpRequest, HttpResponse] {
  // thriftの各クライアント
  private[this] lazy val cacheClient = Thrift.newIface[CacheService.FutureIface](cacheAddr.getHostName+":"+cacheAddr.getPort) // newIfaceにはInetSocketAddrは受け取れず、"host:port"という文字列で渡す。"host:port"という形式はtoStringでも作れない。どうやって作るのが正解なんだろう
  private[this] lazy val dbClient = Thrift.newIface[StoreService.FutureIface](dbAddr.getHostName+":"+dbAddr.getPort)
  private[this] lazy val searchClient = Thrift.newIface[SearchService.FutureIface](searchAddr.getHostName+":"+searchAddr.getPort)

  // キャッシュ
  val cacheFilter = new CacheFilter(cacheClient)
  // 検索
  val searchFilter = new SearchFilter(log, searchClient)
  // DB
  val dbService = new StoreGetService(log, dbClient)
  // getのサービス
  val get = cacheFilter andThen searchFilter andThen dbService
  ...

組み込みのフィルタ

リトライやタイムアウトといったよく使う処理のフィルタはfinagleに組み込まれておりすでに作ったサービスと組み合わせることができます。
getでは2回リトライ、10秒でタイムアウトしたい場合、RetryingFilter、TimeoutFiltrを生成してgetの前にくっつけます。

  val retryFilter = new RetryingFilter[HttpRequest, HttpResponse](RetryPolicy.tries(2), DefaultTimer.twitter)
  val timeoutFilter = new TimeoutFilter[HttpRequest, HttpResponse](10.seconds, DefaultTimer.twitter)

  val post = timeoutFilter andThen retryFilter andThen  
  val get = timeoutFilter andThen retryFilter andThen cacheFilter andThen searchFilter andThen dbService  

完成

最終的なHttpサーバのapplyメソッドは以下のようになります。

  override def apply(request: HttpRequest): Future[HttpResponse] = {
    request.getMethod match {
      case HttpMethod.GET => get(request)
      case HttpMethod.POST => post(request)
      case _ =>
        Future.value(new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.BAD_REQUEST))
    }
  }

動かす

いっぱいサーバを動かすのでめんどいですが…。

キャッシュと検索サーバとDBを動かします。
thriftのポートはそれぞれ、49093、49092、49093を使うことにします。

$ sbt 'run-main com.github.iwag.StoreServer -admin.port=:49991 -bind=:49091'
$ sbt 'run-main com.github.iwag.SearchServer -admin.port=:49992 -bind=:49092'
$ sbt 'run-main com.github.iwag.CacheServer -admin.port=:49993 -bind=:49093'
$ sbt 'run-main com.github.iwag.HTTPServer -admin.port=:49990 -http=:48000 -search=:49092 -cache=:49093 -db=:49091'
$
$ curl -XPOST http://localhost:48000/ -d'hello world'
OK
$ curl -XPOST http://localhost:48000/ -d'世界世界'
OK
$ curl -XPOST http://localhost:48000/ -d'hello 世界'
OK
$ curl -XPOST http://localhost:48000/ -d'こんにちわ'
OK
$ curl -XPOST http://localhost:48000/ -d'こんにちわ世界'
OK
$ curl http://localhost:48000/hello
["hello 世界","hello hello"]
$ curl http://localhost:48000/%E4%B8%96%E7%95%8C
["hello 世界","こんにちわ世界","世界世界"]

できました。

なお、完全なソースコードはgithubにあります。

最後に

finagle流行ってくれ。

iwag@github
iwagです👼 、プロダクトサイドもみれるソフトウェアエンジニア(バックエンド)です。ビジネスサイドみれるようになりたいなーと思いつつ最近はデータに興味ありあり。Coronaを期に熊本住み、フルリモートで働く。東京、バンクーバー、福岡でも働いていました。
http://twitter.com/iwag_
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away