31
32

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Scala + Akka Streamで既存のバッチを10倍以上速くした話

Posted at

完了まで約2時間半かかっていた既存のバッチ処理を、最近興味があったReactiveでリプレイスして10分以内に完了するようにした時の躓きポイントのまとめ

Scala/Akka Streamをチョイスした理由は、下記のスライドに触発されたのと、Spark on EMRを触っていてScalaに躓いていたのでScalaともう少し友達になりたかったからです。

バッチを Akka Streams で再実装したら100倍速くなった話 #ScalaMatsuri
from Kazuki Negoro

システム概要

処理概要

  1. MariaDBから処理対象のデータIDを取得
  2. idから必要となるデータを取得して、JSONを生成
  3. ElasticSearchのBulk APIでデータ投入
  4. ElasticSearchのAlias切り替え
  5. 1〜3を繰り返す

サーバ

EC2(vCPU 4コア)

既存システム概要

  1. PHP 5.4.x
  2. curl_multiを使用
  3. forkせず1プロセスで処理
  4. 30万件のデータ

リプレイス環境のライブラリバージョン

plugins.sbt
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.6")
build.sbt
scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "com.typesafe" % "config" % "1.3.0",
  "org.scala-lang" % "scala-reflect" % "2.11.7",
  "org.scala-lang.modules" %% "scala-xml" % "1.0.5",
  "com.typesafe.akka" % "akka-stream-experimental_2.11" % "2.0.4",
  "mysql" % "mysql-connector-java" % "5.1.38",
  "com.typesafe.slick" %% "slick" % "3.0.3",
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
  "net.databinder.dispatch" %% "dispatch-core" % "0.11.2",
  "org.json4s" %% "json4s-jackson" % "3.3.0",
  "com.github.tototoshi" %% "slick-joda-mapper" % "2.0.0",
  "joda-time" % "joda-time" % "2.7",
  "org.joda" % "joda-convert" % "1.7",
  "org.scalatest" %% "scalatest" % "2.2.4" % "test"
)

enablePlugins(JavaAppPackaging)

?1 Reactive

サーバサイド寄りの人は下記のスライドが参考になるかと思います。
よくみかけるReactive Programmingの説明(クリックストリームを使ったサンプル)はサーバサイド屋さんとしてはイメージが湧きにくかったです。

Typesafe Reactive Platformで作るReactive System from yugolf

端から端までReactive
基本Asynchronous、どこでWaitするかをデザインする

個人的には、実際に作って動かしてみてこの部分が理解するうえでキモだと感じました。

?2 端から端までReactive

実際にコードを書いたときに、前の処理の完了を待つ必要のある処理を書くのに苦労しました。
今回でいうと以下の部分

4.ElasticSearchのAlias切り替え

当初、Future.sequenceを見つけられずElasticSearchへのPUT結果(Future[Boolean])をAwait.readyで待ってみたりさせましたが、「端から端までノンブロッキング」でなければ高スループットにはならないことが身をもって分かりました。(処理時間がほぼ変わらなかった)

// Future.sequenceをつかえばFutureのリストをFutureにできる = flatMapで使える
val v: Future[Seq[Boolean]] = Future.sequence(Seq[Future[Boolean]])

実際の処理で書くとこんなかんじ(簡略版)

  • Source:DBから処理対象のIDをストリームで取得
  • Flow:idからJSONフォーマットのStringを作成
  • Sink:ElasticSearchにJSONをPUT
main.scala
object ESBulkUpdate {
  def main(args: Array[String]): Unit = {
    val f = for {
      v1f <- esBulkPut
      v1 <- Future.sequence(v1f)
      v2 <- esAliasAction  //4
      v3f <- esBulkPut
      v3 <- Future.sequence(v3f)
    } yield (v1, v3)

    f onSuccess {
      // 終了処理とか
    }
  }

  def esBulkPut = {
    val source: Source[PutQueue, Unit] = Source.fromPublisher(db.stream(fetchQueue))

    val flow: Flow[Seq[PutQueue], Future[Seq[String]], Unit] = Flow[Seq[PutQueue]].map { f =>
      val jsonRecords = f.map { record =>
        for {
          master <- getMasterData(record.Id)
          list <- getOptionData(record.Id)
        } yield createEsJsonData(record, master, list)
      }
      db.run(DBIO.sequence(jsonRecords))
    }

    val sink: Sink[Future[String], Future[Seq[Future[Boolean]]]] = Sink.fold[Seq[Future[Boolean]], Future[String]](Seq()) { (x, fReq) =>
      val execResult = for {
        json <- fReq
        esRes <- esPut(json)
      } yield esRes
      execResult +: x
    }

    source
      .grouped(200)
      .via(flow)
      .map { jsonStringList =>
        for (
          list <- jsonStringList
        ) yield list.mkString("")
    }
    .runWith(sink)
  }
}

?3 実行時のException

akka.jvm-exit-on-fatal-error is enabled java.lang.OutOfMemoryError: unable to create new native thread

ExecutionContextが必要なメソッドでActorSystemを都度生成する書き方になっている箇所がありスレッドが大量生成されてしまった

  implicit val system = ActorSystem("xxx")
  implicit val executionContext: ExecutionContext  = system.dispatchers.lookup("my-dispatcher")

  // 上記と同値とはならないっぽい
  implicit val system = ActorSystem("xxx")

Task slick.backend.DatabaseComponent$DatabaseDef$$anon$2@69252dbf rejected from java.util.concurrent.ThreadPoolExecutor...

Slickで大量にDBIOActionを実行するとスレッドプールあふれが起こるよう。
クエリ発行をまとめることで回避

main.scala
  // 省略

  val jsonRecords = f.map { record =>
    for {
      master <- getMasterData(record.Id)
      list <- getOptionData(record.Id)
    } yield createEsJsonData(record, master, list)
  }
  // Seq[DBIOAction[T]]をDBIO.sequenceで1つのDBIOActionにする
  db.run(DBIO.sequence(jsonRecords))

  // 省略

  source
      .grouped(200) // ここで200レコード単位で処理をまとめる
      .via(flow)
      // 省略

?4 スループットがでない

今回完全解決はできていない課題として「コストが高い処理がある場合にスループットが出ない」がありました。

SlickのDB処理でエグ目なクエリの処理コストがかかっていたので、pre-compileをするようにしたり、生クエリを書いたりしてとりあえず回避しましたが、
「最適化しても重い処理があったときにのスループットを改善する」ことは試せませんでした。

恐らく、Balanceを使うとか、MapAsyncを使うとかになるのかと思いますが、今後機会があれば試してみようと思います。

※ Slickのサンプル

dao.scala
// 例
def getXxxx(targetId: Rep[Long]) = {
  val xxx = TableQuery[xxx]
  val ddd = TableQuery[ddd]
  val eee = TableQuery[eee]

  (for {
    ax <- xxx if ax.id === targetId
    i <- ddd if ax.id === i.id
    c <- eee if ax.id === c.id
  } yield (c, ax, i))
    .groupBy(_._1.id)
    .map { case (id, group) => (id, group.map(_._3.val).sum) }
    .withFilter { case (id, sumVal) => sumVal > 0L }
    .map(_._1)
}

// pre-compileしたものを使う
val getXxxxCompiled = Compiled(getXxxx _)

// がんばってもパフォーマンス改善しなそうだったら生SQL
// http://krrrr38.github.io/slick-doc-ja/v3.0.out/SQLからSlickを利用する人へ.html
def getXxxx(id: Long) = {
  sql"""
      SELECT id
      FROM xx
      INNER JOIN yy on yy.id = xx.id
      WHERE xx.id = $id
      GROUP BY xx.id
      HAVING SUM(xx.cnt) > 0
    """.as[(Option[Long])]
}

まとめ

Scala歴半月程度、Reactive Systemなんじゃそりゃ状態での躓きポイントなので、初歩的な内容が多いのではないかと思います。
学習コストは高いですが、実行時間が圧倒的に短縮されたときにすごく脳汁出ると思うので、興味のある人は是非チャレンジしてみてください。

31
32
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
31
32

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?