完了まで約2時間半かかっていた既存のバッチ処理を、最近興味があったReactiveでリプレイスして10分以内に完了するようにした時の躓きポイントのまとめ
Scala/Akka Streamをチョイスした理由は、下記のスライドに触発されたのと、Spark on EMRを触っていてScalaに躓いていたのでScalaともう少し友達になりたかったからです。
バッチを Akka Streams で再実装したら100倍速くなった話 #ScalaMatsuri
from Kazuki Negoro
システム概要
処理概要
- MariaDBから処理対象のデータIDを取得
- idから必要となるデータを取得して、JSONを生成
- ElasticSearchのBulk APIでデータ投入
- ElasticSearchのAlias切り替え
- 1〜3を繰り返す
サーバ
EC2(vCPU 4コア)
既存システム概要
- PHP 5.4.x
- curl_multiを使用
- forkせず1プロセスで処理
- 30万件のデータ
リプレイス環境のライブラリバージョン
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.6")
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.3.0",
"org.scala-lang" % "scala-reflect" % "2.11.7",
"org.scala-lang.modules" %% "scala-xml" % "1.0.5",
"com.typesafe.akka" % "akka-stream-experimental_2.11" % "2.0.4",
"mysql" % "mysql-connector-java" % "5.1.38",
"com.typesafe.slick" %% "slick" % "3.0.3",
"com.jolbox" % "bonecp" % "0.8.0.RELEASE",
"net.databinder.dispatch" %% "dispatch-core" % "0.11.2",
"org.json4s" %% "json4s-jackson" % "3.3.0",
"com.github.tototoshi" %% "slick-joda-mapper" % "2.0.0",
"joda-time" % "joda-time" % "2.7",
"org.joda" % "joda-convert" % "1.7",
"org.scalatest" %% "scalatest" % "2.2.4" % "test"
)
enablePlugins(JavaAppPackaging)
?1 Reactive
サーバサイド寄りの人は下記のスライドが参考になるかと思います。
よくみかけるReactive Programmingの説明(クリックストリームを使ったサンプル)はサーバサイド屋さんとしてはイメージが湧きにくかったです。
Typesafe Reactive Platformで作るReactive System from yugolf
端から端までReactive
基本Asynchronous、どこでWaitするかをデザインする
個人的には、実際に作って動かしてみてこの部分が理解するうえでキモだと感じました。
?2 端から端までReactive
実際にコードを書いたときに、前の処理の完了を待つ必要のある処理を書くのに苦労しました。
今回でいうと以下の部分
4.ElasticSearchのAlias切り替え
当初、Future.sequenceを見つけられずElasticSearchへのPUT結果(Future[Boolean])をAwait.readyで待ってみたりさせましたが、「端から端までノンブロッキング」でなければ高スループットにはならないことが身をもって分かりました。(処理時間がほぼ変わらなかった)
// Future.sequenceをつかえばFutureのリストをFutureにできる = flatMapで使える
val v: Future[Seq[Boolean]] = Future.sequence(Seq[Future[Boolean]])
実際の処理で書くとこんなかんじ(簡略版)
- Source:DBから処理対象のIDをストリームで取得
- Flow:idからJSONフォーマットのStringを作成
- Sink:ElasticSearchにJSONをPUT
object ESBulkUpdate {
def main(args: Array[String]): Unit = {
val f = for {
v1f <- esBulkPut
v1 <- Future.sequence(v1f)
v2 <- esAliasAction //4
v3f <- esBulkPut
v3 <- Future.sequence(v3f)
} yield (v1, v3)
f onSuccess {
// 終了処理とか
}
}
def esBulkPut = {
val source: Source[PutQueue, Unit] = Source.fromPublisher(db.stream(fetchQueue))
val flow: Flow[Seq[PutQueue], Future[Seq[String]], Unit] = Flow[Seq[PutQueue]].map { f =>
val jsonRecords = f.map { record =>
for {
master <- getMasterData(record.Id)
list <- getOptionData(record.Id)
} yield createEsJsonData(record, master, list)
}
db.run(DBIO.sequence(jsonRecords))
}
val sink: Sink[Future[String], Future[Seq[Future[Boolean]]]] = Sink.fold[Seq[Future[Boolean]], Future[String]](Seq()) { (x, fReq) =>
val execResult = for {
json <- fReq
esRes <- esPut(json)
} yield esRes
execResult +: x
}
source
.grouped(200)
.via(flow)
.map { jsonStringList =>
for (
list <- jsonStringList
) yield list.mkString("")
}
.runWith(sink)
}
}
?3 実行時のException
akka.jvm-exit-on-fatal-error is enabled java.lang.OutOfMemoryError: unable to create new native thread
ExecutionContextが必要なメソッドでActorSystemを都度生成する書き方になっている箇所がありスレッドが大量生成されてしまった
implicit val system = ActorSystem("xxx")
implicit val executionContext: ExecutionContext = system.dispatchers.lookup("my-dispatcher")
// 上記と同値とはならないっぽい
implicit val system = ActorSystem("xxx")
Task slick.backend.DatabaseComponent$DatabaseDef$$anon$2@69252dbf rejected from java.util.concurrent.ThreadPoolExecutor...
Slickで大量にDBIOActionを実行するとスレッドプールあふれが起こるよう。
クエリ発行をまとめることで回避
// 省略
val jsonRecords = f.map { record =>
for {
master <- getMasterData(record.Id)
list <- getOptionData(record.Id)
} yield createEsJsonData(record, master, list)
}
// Seq[DBIOAction[T]]をDBIO.sequenceで1つのDBIOActionにする
db.run(DBIO.sequence(jsonRecords))
// 省略
source
.grouped(200) // ここで200レコード単位で処理をまとめる
.via(flow)
// 省略
?4 スループットがでない
今回完全解決はできていない課題として「コストが高い処理がある場合にスループットが出ない」がありました。
SlickのDB処理でエグ目なクエリの処理コストがかかっていたので、pre-compileをするようにしたり、生クエリを書いたりしてとりあえず回避しましたが、
「最適化しても重い処理があったときにのスループットを改善する」ことは試せませんでした。
恐らく、Balanceを使うとか、MapAsyncを使うとかになるのかと思いますが、今後機会があれば試してみようと思います。
※ Slickのサンプル
// 例
def getXxxx(targetId: Rep[Long]) = {
val xxx = TableQuery[xxx]
val ddd = TableQuery[ddd]
val eee = TableQuery[eee]
(for {
ax <- xxx if ax.id === targetId
i <- ddd if ax.id === i.id
c <- eee if ax.id === c.id
} yield (c, ax, i))
.groupBy(_._1.id)
.map { case (id, group) => (id, group.map(_._3.val).sum) }
.withFilter { case (id, sumVal) => sumVal > 0L }
.map(_._1)
}
// pre-compileしたものを使う
val getXxxxCompiled = Compiled(getXxxx _)
// がんばってもパフォーマンス改善しなそうだったら生SQL
// http://krrrr38.github.io/slick-doc-ja/v3.0.out/SQLからSlickを利用する人へ.html
def getXxxx(id: Long) = {
sql"""
SELECT id
FROM xx
INNER JOIN yy on yy.id = xx.id
WHERE xx.id = $id
GROUP BY xx.id
HAVING SUM(xx.cnt) > 0
""".as[(Option[Long])]
}
まとめ
Scala歴半月程度、Reactive Systemなんじゃそりゃ状態での躓きポイントなので、初歩的な内容が多いのではないかと思います。
学習コストは高いですが、実行時間が圧倒的に短縮されたときにすごく脳汁出ると思うので、興味のある人は是非チャレンジしてみてください。