結論から先に
巨大なファイルを読み込んで DB に投入するケースのように、Slick 3 の非同期 DB I/O を使って一度に大量の SQL を実行する場合は:
- うまいこと DBIOAction を合成してやらないと Thread Pool の実行キューが溢れてエラーになります。
- さらにその DBIOAction は grouped で適度に分割してやらないと StackOverflow が発生します。
というお話。サンプルコードは結構適当なので間違っている部分があるかも知れません (まだ Slick 3 での記述スタイルを試行錯誤中なのと、この問題を徹夜で調べて明け方の頭で書いているためです)。
DBIOAction を合成しよう
合成しない書き方
TSV ファイルを読み込んで、レコードが既に存在していれば更新、存在していなければ追加、というありがちな処理を実装します (分離レベルをRRにするなどしなければいけないんですが省略)。結果は (追加件数, 更新件数)
を表す Future[(Int,Int)]
です。
Source.fromFile(tsvfile).getLines().map{ line =>
val id :: x :: y :: Nil = line.split("\t").toList
db.run(Foos.filter{ _.id === id }.map{ row => (row.x, row.y) }.update((x, y)))
}.flatMap{
case 0 =>
db.run((Foos += FoosRow(id, x, y)).map{ insert => (insert, 0) }
case update =>
Future.successful((0, update))
}.foldLeft((0, 0)){ case (x, y) => (x._1+y._1, x._2+y._2) })
上記は 1 つの SQL 実行ごとに db.run()
を使用しています。そして実際に実行すると数千件といった現実的なオーダーで Thread Pool のキューが溢れることがあります。これは非同期 DB I/O の処理速度よりも DBIOAction
をキューへ投入する速度が上回るためです。
java.util.concurrent.RejectedExecutionException: Task *** rejected from java.util.concurrent.ThreadPoolExecutor
全ての SQL を 1 つに合成
この問題は非同期処理の粒度が高すぎることに起因します。このため DBIOAction
を合成して db.run()
の実行回数を削減するように修正します。
db.run(DBIOAction.fold(Source.fromFile(tsvfile).getLines().map{ line =>
val id :: x :: y :: Nil = line.split("\t").toList
Foos.filter{ _.id === id }.map{ row => (row.x, row.y) }.update((x, y)).flatMap{
case 0 =>
(Foos += FoosRow(id, x, y)).map{ insert => (insert, 0) }
case update =>
DBIO.successful((0, update))
}
}, (0, 0)){ case (x, y) => (x._1+y._1, x._2+y._2) }))
全てを合成したので db.run()
の呼び出しは 1 回になりました。
ただ、ここでは Scala の遅延評価を使って全データに相当する大量の DBIOAction
オブジェクトがメモリ上に生成されないように巧妙な実装をしています。途中で toList
など即時評価となる処理を挟むと全データがメモリに展開され OOME が発生しますので十分注意して実装する必要があります。
ここで SQL の合成パターン
相互に依存しない複数の DBIOAction を逐次実行する場合は DBIOAction.seq()
や DBIOAction.fold()
を使ってまとめることができます。
// SQL1; SQL2; SQL3
db.run(DBIOAction.seq(Seq(SQL1, SQL2, SQL3)))
直前の SQL の結果によって次の SQL が決まるような条件付き実行を行う場合は flatMap
や filter
を使って合成します。
// if(SQL1 == X) SQL2
db.run(SQL1.filter{_ == X}.flatMap{ SQL2 })
// R = SQL1; if(R == X) SQL2 else if(R == Y) SQL3 else SQL4
db.run(SQL1.flatMap{
case r if r == X => SQL2
case r if r == Y => SQL3
case _ => SQL4
})
ここで $SQL_n$ は Foos.filer{_.id===id}.headOption.result
のような DBIOAction
を結果とする任意の SQL 処理です。
このように合成した DBIOAction の transactionally
で合成した全ての SQL を同一トランザクションで実行する DBIOAction を取得できます。
db.run(DBIOAction.seq(Seq(SQL1, SQL2, SQL3)).transactionally)
grouped でダサく分割実行
さて、不幸なことに Slick 3.1.1 の時点では大量の DBIOAction
を合成実行すると StackOverflow
が発生します (概ね1-2万件のオーダーでも発生します)。DBIOAction.seq()
は継続 (continuations) で実装されているため、まとめて実行する DBIOAction の件数に比例してコールスタックが消費されることが原因です。
この問題を回避するためには grouped
を使って 1 度に実行する件数を制限し結果をたたみ込むように修正します。
db.run(DBIOAction.fold(Source.fromFile(tsvfile).getLines().map{ line =>
val id :: x :: y :: Nil = line.split("\t").toList
Foos.filter{ _.id === id }.map{ row => (row.x, row.y) }.update((x, y)).flatMap{
case 0 =>
(Foos += FoosRow(id, x, y)).map{ insert => (insert, 0) }
case update =>
DBIO.successful((0, update))
}
}.grouped(1000).map{ actions =>
DBIOAction.fold(actions, (0, 0)){ case (x, y) => (x._1+y._1, x._2+y._2) }
}, (0, 0)){ case (x, y) => (x._1+y._1, x._2+y._2) })
うーん、ダサいですね…。上記で 1000 件とした grouped の単位が適切な数値なのかは論理的に説明できません。データ件数が 1000 倍になれば前述した RejectedExecutionException
問題に逆戻りしてしまうため設計で根本を解決したわけではありませんね。さらにトランザクション化する場合はトランザクションの単位が設定できません。うまい方法があったら誰か教えてください。
これは Slick 3.1.1 の設計/実装レベルの不備だと (私は) 思いますので Slick のバージョンが上がって解消されることを期待しています。とにかく、現時点でデータのインポートのような大量の SQL を実行するケースで必要な回避策だと思いますので、同じ壁に当たった人がこの記事に辿り着けるよう祈りながら書き記しておきます。