Edited at

Slick 3 で大量の SQL を実行するための注意書き

More than 3 years have passed since last update.


結論から先に

巨大なファイルを読み込んで DB に投入するケースのように、Slick 3 の非同期 DB I/O を使って一度に大量の SQL を実行する場合は:


  • うまいこと DBIOAction を合成してやらないと Thread Pool の実行キューが溢れてエラーになります。

  • さらにその DBIOAction は grouped で適度に分割してやらないと StackOverflow が発生します。

というお話。サンプルコードは結構適当なので間違っている部分があるかも知れません (まだ Slick 3 での記述スタイルを試行錯誤中なのと、この問題を徹夜で調べて明け方の頭で書いているためです)。


DBIOAction を合成しよう


合成しない書き方

TSV ファイルを読み込んで、レコードが既に存在していれば更新、存在していなければ追加、というありがちな処理を実装します (分離レベルをRRにするなどしなければいけないんですが省略)。結果は (追加件数, 更新件数) を表す Future[(Int,Int)] です。

Source.fromFile(tsvfile).getLines().map{ line =>

val id :: x :: y :: Nil = line.split("\t").toList
db.run(Foos.filter{ _.id === id }.map{ row => (row.x, row.y) }.update((x, y)))
}.flatMap{
case 0 =>
db.run((Foos += FoosRow(id, x, y)).map{ insert => (insert, 0) }
case update =>
Future.successful((0, update))
}.foldLeft((0, 0)){ case (x, y) => (x._1+y._1, x._2+y._2) })

上記は 1 つの SQL 実行ごとに db.run() を使用しています。そして実際に実行すると数千件といった現実的なオーダーで Thread Pool のキューが溢れることがあります。これは非同期 DB I/O の処理速度よりも DBIOAction をキューへ投入する速度が上回るためです。

java.util.concurrent.RejectedExecutionException: Task *** rejected from java.util.concurrent.ThreadPoolExecutor


全ての SQL を 1 つに合成

この問題は非同期処理の粒度が高すぎることに起因します。このため DBIOAction を合成して db.run() の実行回数を削減するように修正します。

db.run(DBIOAction.fold(Source.fromFile(tsvfile).getLines().map{ line =>

val id :: x :: y :: Nil = line.split("\t").toList
Foos.filter{ _.id === id }.map{ row => (row.x, row.y) }.update((x, y)).flatMap{
case 0 =>
(Foos += FoosRow(id, x, y)).map{ insert => (insert, 0) }
case update =>
DBIO.successful((0, update))
}
}, (0, 0)){ case (x, y) => (x._1+y._1, x._2+y._2) }))

全てを合成したので db.run() の呼び出しは 1 回になりました。

ただ、ここでは Scala の遅延評価を使って全データに相当する大量の DBIOAction オブジェクトがメモリ上に生成されないように巧妙な実装をしています。途中で toList など即時評価となる処理を挟むと全データがメモリに展開され OOME が発生しますので十分注意して実装する必要があります。


ここで SQL の合成パターン

相互に依存しない複数の DBIOAction を逐次実行する場合は DBIOAction.seq()DBIOAction.fold() を使ってまとめることができます。

// SQL1; SQL2; SQL3

db.run(DBIOAction.seq(Seq(SQL1, SQL2, SQL3)))

直前の SQL の結果によって次の SQL が決まるような条件付き実行を行う場合は flatMapfilter を使って合成します。

// if(SQL1 == X) SQL2

db.run(SQL1.filter{_ == X}.flatMap{ SQL2 })

// R = SQL1; if(R == X) SQL2 else if(R == Y) SQL3 else SQL4
db.run(SQL1.flatMap{
case r if r == X => SQL2
case r if r == Y => SQL3
case _ => SQL4
})

ここで $SQL_n$ は Foos.filer{_.id===id}.headOption.result のような DBIOAction を結果とする任意の SQL 処理です。

このように合成した DBIOAction の transactionally で合成した全ての SQL を同一トランザクションで実行する DBIOAction を取得できます。

db.run(DBIOAction.seq(Seq(SQL1, SQL2, SQL3)).transactionally)


grouped でダサく分割実行

さて、不幸なことに Slick 3.1.1 の時点では大量の DBIOAction を合成実行すると StackOverflow が発生します (概ね1-2万件のオーダーでも発生します)。DBIOAction.seq() は継続 (continuations) で実装されているため、まとめて実行する DBIOAction の件数に比例してコールスタックが消費されることが原因です。

この問題を回避するためには grouped を使って 1 度に実行する件数を制限し結果をたたみ込むように修正します。

db.run(DBIOAction.fold(Source.fromFile(tsvfile).getLines().map{ line =>

val id :: x :: y :: Nil = line.split("\t").toList
Foos.filter{ _.id === id }.map{ row => (row.x, row.y) }.update((x, y)).flatMap{
case 0 =>
(Foos += FoosRow(id, x, y)).map{ insert => (insert, 0) }
case update =>
DBIO.successful((0, update))
}
}.grouped(1000).map{ actions =>
DBIOAction.fold(actions, (0, 0)){ case (x, y) => (x._1+y._1, x._2+y._2) }
}, (0, 0)){ case (x, y) => (x._1+y._1, x._2+y._2) })

うーん、ダサいですね…。上記で 1000 件とした grouped の単位が適切な数値なのかは論理的に説明できません。データ件数が 1000 倍になれば前述した RejectedExecutionException 問題に逆戻りしてしまうため設計で根本を解決したわけではありませんね。さらにトランザクション化する場合はトランザクションの単位が設定できません。うまい方法があったら誰か教えてください。

これは Slick 3.1.1 の設計/実装レベルの不備だと (私は) 思いますので Slick のバージョンが上がって解消されることを期待しています。とにかく、現時点でデータのインポートのような大量の SQL を実行するケースで必要な回避策だと思いますので、同じ壁に当たった人がこの記事に辿り着けるよう祈りながら書き記しておきます。