こんにちは、
今回は個人的に注目しているfinagleとthriftを使って簡単なmicro-servicesの作成を行うチュートリアルを紹介します。
finagleについてはseratchさんの翻訳、Scalaのmicro services事例はokapiesさんのエントリが詳しいと思います。
題材
日本語も検索できる全文検索サービスを作ります。POSTでデータを入れて、GETでヒットしたドキュメントを表示するといった簡単な仕様です。
twitterのscalaチュートリアルにsearchbird というfinagleとthriftを使って検索エンジンを作るチュートリアルがありました。
searchbirdは更新されておらず動かない+古いということでもとに紹介したいと思います。
また、micro-servicesっぽい構成にするために無理矢理ですが、3つのサーバに分けました。
- キャッシュ
- 検索サーバ
- DB
キャッシュはpathをキーにレスポンスをキャッシュします。ずっと持っててもしょうがないので10秒くらいしたら飛ばします。
検索サーバにはインデックスしか持たず、ドキュメントの本文はDBに持たせて、idを使ってDBから引くようにします。
また元のsearchbirdでは空白区切りの単語ごとの転置インデックスでしたが、日本語に対応したいのでバイグラムの転置インデックスにします。
各サーバへはThriftでつなぎます。
thrift定義
各サーバ間のインターフェイスをThrift で記述します。
まずキャッシュサーバのインターフェイスを定義します。
struct CacheResponse {
1 option string hit
2 string msg
}
service CacheService {
CacheResponse get(1: string key)
void put(1: string key, 2: string value)
}
次の節で説明しますが、これがScalaのコードに変換されます。
文字列をキーにして、一定時間文字列をキャッシュします。
getの返り値はOption[String]にしたいのですが、thriftではoption string get() のように定義できないため構造体を使います。
検索サーバのインターフェイスです。
service SearchService {
list<int> search(1: string key)
i32 put(1: string value)
}
見つかったドキュメントのidをリストにして返します。
このidはputした際にSearchServiceで決めてあげます。
DBのインターフェイスです。
service DbService {
string get(1: i32 key) (1: BaseException exp)
void put(1: i32 id, 2: string value)
}
ドキュメント本体はSearchで決めたidをキーにDBに格納します。こちらは変なIDでgetした場合は例外なのでExceptionで伝えます。
なお、Thriftの機能はこれ以外にもいろいろあります。
参考: http://thrift.apache.org/docs/idl
https://git-wip-us.apache.org/repos/asf?p=thrift.git;a=blob_plain;f=tutorial/tutorial.thrift
scrooge
Thrift は公式のScalaバインディングがないのでtwitterが作っているscroogeというツールを使います。
project/plugins.sbt
addSbtPlugin("com.twitter" %% "scrooge-sbt-plugin" % "3.16.3")
build.sbt
com.twitter.scrooge.ScroogeSBT.newSettings
thriftのコードをsrc/main/thrift に置くと、コンパイル時に同時にScalaのソースファイルがtarget/scala-2.10/src_managedに生成されます。
自動生成のソースは長いですが、重要なのはFutureのインターフェイス(FutureIface)です。
// キャッシュサーバ
object CacheService {
...
trait FutureIface extends CacheService[Future] {
def get(key: String): Future[CacheResponse]
def put(key: String, value: String): Future[Unit]
}
}
object CacheResponse {
def apply(
hit: Option[String] = None,
msg: String
): CacheResponse =
new Immutable(
hit,
msg
)
...
}
// 検索サーバ
object SearchService {
...
trait FutureIface extends SearchService[Future] {
def search(key: String): Future[Seq[Int]]
def put(value: String): Future[Int]
def delete(id: Int): Future[Unit]
}
...
}
// DB
object DbService {
...
trait FutureIface extends DbService[Future] {
def get(key: Int): Future[String]
def put(id: Int, value: String): Future[Unit]
def delete(id: Int): Future[Unit]
}
...
}
特に驚きはない感じでScalaのインターフェイスが生成されます。
これらserviceから実際のサーバとクライアントは以下のように作ります。
val cacheClient = Thrift.client.newIface[CacheService.FutureIface]("localhost:9090")
val cacheServer = Thrift.serveIface("localhost:9090", new CacheServerImpl())
serveIfaceの引数2は、XXXService.FutureIface型で自分で実装するserviceの実体を指定します。
なお、scroogeやfinagleのexampleには古い作り方で作って利しているので注意です(ドキュメントの日付に注意)。
各サーバの実装
長くなるので省略します。githubにソースを上げているのでそちらを参照してください。
twitter-server
次にHTTPのサーバ部を作ります。
twitter-server は
引数処理、ロギングやヘルスチェック、トレース、プロファイルなど全部入りのライブラリで、サーバを簡単に作れるのでこれを使います。
機能 http://twitter.github.io/twitter-server/Features.html
twitter-serverやfinagle、scroogeに言えることですが、twitterのScalaライブラリは基本Scala 2.10まで、sbtも0.12までですので、
build.sbtやbuild.propertiesに明記する必要があります。
とりあえず、一番最初は管理用のHTTPサーバだけ動かします。
object HTTPServer extends TwitterServer {
def main() {
onExit {
adminHttpServer.close()
}
log.info("start admin:"+adminHttpServer.boundAddress)
Await.all(adminHttpServer,httpServer)
}
}
管理サーバのデフォルトのポートは9090で変更するには、引数にadmin.portを指定します。
$ sbt compile
$ sbt 'run-main org.github.iwag.HTTPServer -admin.port=:49090'
$ curl localhost:49990/admin
/admin/pprof/contention
/admin/metrics_graphs
/admin/pprof/profile
/admin/metrics.json
/admin/server_info
/admin/pprof/heap
/admin/contention
/abortabortabort
/admin/announcer
/admin/shutdown
/admin/clients/
/admin/logging
/admin/tracing
/admin/threads
/admin/metrics
/quitquitquit
/admin/files
/admin/ping
/admin/dtab
/health
/admin/
/admin
見ての通りいろいろな管理用のRestAPIがあります。
/health→OKとか/admin/pprof/profileでプロファイルがとれたり、コネクション数が見れたり、/admin/metrics.jsonでgcの回数とか値が取れたり、
/admin/shutdown で停止したりできます。
ただし metrics を見るにはbuild.sbtにfinagle-statsの追加が必要です。
"com.twitter" %% "finagle-stats" % "6.22.0",
$ curl localhost:49990/admin/metrics.json?pretty=true
{
"clnt/0.0.0.0:49092/available" : 2.0,
"clnt/0.0.0.0:49092/cancelled_connects" : 0,
"clnt/0.0.0.0:49092/closechans" : 0,
"clnt/0.0.0.0:49092/closed" : 0,
"clnt/0.0.0.0:49092/closes" : 0,
"clnt/0.0.0.0:49092/codec_connection_preparation_latency_ms.avg" : 0.0,
"clnt/0.0.0.0:49092/codec_connection_preparation_latency_ms.count" : 0,
"clnt/0.0.0.0:49092/codec_connection_preparation_latency_ms.max" : 0,
...
}
数えられないくらいのmetricsが出ます。
ちなみに、ヒープダンプを取る /admin/pprof/heap を有効にするには以下の記述が必要です(これは完全にドキュメントがなかった)。
"com.twitter" %% "util-jvm" % "6.22.0"
twittr-server全部便利な感じなんですがドキュメントが少ない感じでソースに当たらないとダメだしドキュメントもたいがい古いです。
twitter-serverで一番参考になったのはfinatraで、使い方がわからない場合finatraのソースを参考にするとよいです。
HTTPサーバ
とりあえず、GETリクエストに対して200 OKを返すようにします。
これはHTTPリクエストからHTTPレスポンスを返すサービスで、finagleでは以下のような型になります。
trait Service[HTTPRequest, HTTPResponse]
HTTPRequestとHTTPResponseは Nettyのクラスです。が、Netty4向けの新しいhttpxではfinagle独自のRequest等が定義されるようです。
// HTTPサービスの実体、HTTPRequestからHTTPResponseを返す
class HTTPServerImpl() extends Service[HttpRequest, HttpResponse] {
override def apply(request: HttpRequest): Future[HttpResponse] = Future.value{
val response = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK)
response.setContent(copiedBuffer("OK", Utf8))
response
}
}
object HTTPServer extends TwitterServer {
// HTTPサーバにバインドするアドレス。引数から取れる
val httpAddr = flag("http", new InetSocketAddress(48080), "HTTP bind address")
def main() {
// エンドポイントに対してハンドラとしてサービスを割り当てる
val httpMux = new HttpMuxer().withHandler("/", new HTTPServerImpl())
// サーバを作成する
val httpServer = Http.serve(httpAddr(), httpMux)
onExit {
adminHttpServer.close()
httpServer.close()
}
log.info("start admin:" +adminHttpServer.boundAddress)
log.info("start http:"+ httpServer.boundAddress)
Await.all(adminHttpServer,httpServer)
}
}
$ sbt 'run-main org.github.iwag.HTTPServer -admin.port=:49090 -http=:48080'
$ curl localhost:48080/
OK%
POST
POSTして、キャッシュとDBに保存するところを作ります。
まず先ほどのHTTPサーバの実装を以下のように変更します。
class HTTPServerImpl() extends Service[HttpRequest, HttpResponse] {
override def apply(request: HttpRequest): Future[HttpResponse] = request.getMethod match {
case HttpMethod.GET => ??? // あとで
case HttpMethod.POST => post(request) // これを作る
case _ =>
Future.value(new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.BAD_REQUEST))
}
}
ここでもpostのとこだけサービスにして切り出します。
ScalaでよくあるFutureを使ったプログラミングです:Scala document。
class PostService() extends Service[HttpRequest, HttpResponse] {
// DB用とSearch用のそれぞれのThriftのクライアントです
private[this] lazy val dbClient = Thrift.newIface[StoreService.FutureIface](dbAddr.getHostName+":"+dbAddr.getPort)
private[this] lazy val searchClient = Thrift.newIface[SearchService.FutureIface](searchAddr.getHostName+":"+searchAddr.getPort)
override def apply(request: HttpRequest): Future[HttpResponse] = {
val body = request.getContent.toString(Utf8)
val stored:Future[Unit] = searchClient.put(body) flatMap { i:Int => // SearchServiceにputで問い合わせて、flatMapで返ってきたidと
dbClient.put(i, body) // 文字列をdbServiceにputします
}
stored map { _ => // _:UnitだけどFutureで返したいのでmapのなかでResponseを作る
val response = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK)
response.setContent(copiedBuffer("OK", Utf8))
response
}
}
}
get
finagleはサービスやフィルタを組み合わせて新しいサービス(やフィルタ)を作ることができます。
get側はfinagleっらしくcache, searchのフィルタとdbのサービスを組み合わせて新しいサービスを構成します。
val get = cacheFilter andThen searchFilter andThen dbService
filter
フィルタはサービスの上に重ねることができ、Scalaの関数合成のように andThenでチェインさせることができます。
cacheのフィルタです。
ヒットしたらその結果を返す、しなかったら次のサービスに問い合わせる。サービスから返ってきたのをキャッシュに入れます。
// 引数にthriftのクライアントを渡す
class CacheFilter(cacheService: CacheService.FutureIface) extends SimpleFilter[HttpRequest, HttpResponse] {
// ↓フィルタの入力 ↓ 次のservice(またはfilter) ↓フィルタの出力
override def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]): Future[HttpResponse] = {
cacheService.get(request.getUri) flatMap { // cacheService のクライアントでgetを呼び出す。getはFutureで返ってくるのでflatMapを使う
_.hit match {
case Some(v) => { // キャッシュにヒットしたら、次のserviceを呼ばずに HttpResponseにつめて返す
val response = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK)
response.setContent(copiedBuffer(v, Utf8))
Future(response)
}
case _ => { // ヒットしなかったら次のserviceを呼びだす
service(request) flatMap { res =>
// 結果をキャッシュに書き戻す
cacheService.put(request.getUri, res.getContent.toString(Utf8)) map { i => res }
}
}
}
}
}
}
searchは、HTTPRequstを解釈して検索キーワードを取り出し、Searchのサービスに問い合わせます。
class SearchFilter(log:Logger, searchService: SearchService.FutureIface) extends Filter[HttpRequest, HttpResponse, List[Int], List[String]] {
override def apply(request: HttpRequest, service: Service[List[Int], List[String]]): Future[HttpResponse] = {
log.info(request.toString + " " + request.getUri)
// まずHttpRequestからList[Int] を作って次のサービスに渡します
val decoded = java.net.URLDecoder.decode(request.getUri, "UTF-8")
val keyword = decoded.split("/")(1)
val flist:List[Int] = searchService.search(keyword) flatMap { res:Seq[Int] => // searchを呼び出す。futureで返ってくるのでflatMapで取り出す
log.info(keyword + " " + res.toString)
service(res.toList) // 次のサービスを呼び出す
}
// 次はList[String]からHttpResponseを作ります
flist map { l =>
val response = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK)
val str = l match {
case List() => response.setStatus(HttpResponseStatus.NOT_FOUND); "" // 空の場合Responseに404をセットして空文字列を返す
case list => list.map("\""+_+"\"").mkString (",") // ''"見つかった","見つかった","見つかった"' のような文字列を作る
}
response.setContent (copiedBuffer(s"[${str}]", Utf8) )
response
}
}
}
DBは、idのリストを受け取るのでidごとに問い合わせてcollectで集めて返します。
class StoreGetService(log:Logger, dbService: DbService.FutureIface) extends Service[List[Int], List[String]] {
override def apply(request: List[Int]): Future[List[String]] = {
val list :List[Future[String]] = request.map (storeService.get(_))
Future.collect(list) map (_.toList)
}
}
これらはHTTPサーバの中で次のように利用します。
class HTTPServerImpl(log: Logger, searchAddr: InetSocketAddress, cacheAddr: InetSocketAddress, dbAddr: InetSocketAddress) extends Service[HttpRequest, HttpResponse] {
// thriftの各クライアント
private[this] lazy val cacheClient = Thrift.newIface[CacheService.FutureIface](cacheAddr.getHostName+":"+cacheAddr.getPort) // newIfaceにはInetSocketAddrは受け取れず、"host:port"という文字列で渡す。"host:port"という形式はtoStringでも作れない。どうやって作るのが正解なんだろう
private[this] lazy val dbClient = Thrift.newIface[StoreService.FutureIface](dbAddr.getHostName+":"+dbAddr.getPort)
private[this] lazy val searchClient = Thrift.newIface[SearchService.FutureIface](searchAddr.getHostName+":"+searchAddr.getPort)
// キャッシュ
val cacheFilter = new CacheFilter(cacheClient)
// 検索
val searchFilter = new SearchFilter(log, searchClient)
// DB
val dbService = new StoreGetService(log, dbClient)
// getのサービス
val get = cacheFilter andThen searchFilter andThen dbService
...
組み込みのフィルタ
リトライやタイムアウトといったよく使う処理のフィルタはfinagleに組み込まれておりすでに作ったサービスと組み合わせることができます。
getでは2回リトライ、10秒でタイムアウトしたい場合、RetryingFilter、TimeoutFiltrを生成してgetの前にくっつけます。
val retryFilter = new RetryingFilter[HttpRequest, HttpResponse](RetryPolicy.tries(2), DefaultTimer.twitter)
val timeoutFilter = new TimeoutFilter[HttpRequest, HttpResponse](10.seconds, DefaultTimer.twitter)
val post = timeoutFilter andThen retryFilter andThen
val get = timeoutFilter andThen retryFilter andThen cacheFilter andThen searchFilter andThen dbService
完成
最終的なHttpサーバのapplyメソッドは以下のようになります。
override def apply(request: HttpRequest): Future[HttpResponse] = {
request.getMethod match {
case HttpMethod.GET => get(request)
case HttpMethod.POST => post(request)
case _ =>
Future.value(new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.BAD_REQUEST))
}
}
動かす
いっぱいサーバを動かすのでめんどいですが…。
キャッシュと検索サーバとDBを動かします。
thriftのポートはそれぞれ、49093、49092、49093を使うことにします。
$ sbt 'run-main com.github.iwag.StoreServer -admin.port=:49991 -bind=:49091'
$ sbt 'run-main com.github.iwag.SearchServer -admin.port=:49992 -bind=:49092'
$ sbt 'run-main com.github.iwag.CacheServer -admin.port=:49993 -bind=:49093'
$ sbt 'run-main com.github.iwag.HTTPServer -admin.port=:49990 -http=:48000 -search=:49092 -cache=:49093 -db=:49091'
$
$ curl -XPOST http://localhost:48000/ -d'hello world'
OK
$ curl -XPOST http://localhost:48000/ -d'世界世界'
OK
$ curl -XPOST http://localhost:48000/ -d'hello 世界'
OK
$ curl -XPOST http://localhost:48000/ -d'こんにちわ'
OK
$ curl -XPOST http://localhost:48000/ -d'こんにちわ世界'
OK
$ curl http://localhost:48000/hello
["hello 世界","hello hello"]
$ curl http://localhost:48000/%E4%B8%96%E7%95%8C
["hello 世界","こんにちわ世界","世界世界"]
できました。
なお、完全なソースコードはgithubにあります。
最後に
finagle流行ってくれ。