(2021/07/21追記)本問題は Camel 3.8 で修正された。
- Upgrading Camel 3.7 to 3.8 # Transactions and Multicast, Splitter, or Recipient List EIPs
- [CAMEL-16103] Stack size increases in split with transacted
概要
Apache Camel でDBから読み取ったレコードをループ処理したところ、1000件近いデータは処理できず StackOverflowError が発生した。
調べていくと、 split()
でループさせるだけなら問題ないが、それが transacted()
より後である場合に発生することがわかった。
うまい解決ができていないが、再現と回避のために試したことをまとめておく。
回避策の案
- JVMのスタックサイズを増やす
- お手軽、コード変更は不要
- データが増えれば結局 StackOverflowError が発生する
- ループ内を別スレッドで処理する
- 変更は1〜2行で済む
- トランザクションから外れるので、DBを読み書きしているような処理では採用できない
-
split()
を多重にして、一段あたりのループ回数を抑える- ループ内の処理も合わせて修正する必要がある
- データが増えれば結局 StackOverflowError が発生する
(とはいえ1重で800回だったのなら、3重で1500万回のループには耐えられる見込み1)
テスト環境
以前に勉強用に作ったサンプルを流用したため、環境も同じ。
https://qiita.com/HMMNRST/items/da89ce94c203633a8fb3
- Java: 1.8.0_191
- Maven: 3.6.3
- Camel: 3.2.0
- Spring: 5.2.5.RELEASE
- Spring Boot: 2.2.6.RELEASE
- MySQL: 5.7.30
単純な再現例
長さ1万の配列をループ処理するだけのrouteを作って実験してみる。
以下のコードでは、進捗を見れるようにループの中で時々インデックスを出力している2。
@Component
public class SampleRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("timer:sample?repeatCount=1")
.transacted()
.split(constant(new int[10000]))
.process(exchange -> {
int index = exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
if ((index & (index - 1)) == 0) System.out.println(index);
})
.end()
.log("transaction finished.")
;
}
}
JVMのスタックサイズを指定して実行する。( -Xss
オプション)
cd path/to/app
# create JAR file
mvn clean package spring-boot:repackage
# run (send SIGINT after 20 seconds)
timeout -sINT 20s \
java -Xss256k -jar target/mycamelapp-1.0-SNAPSHOT.jar
手元の環境では、スタックサイズ256KBだとループ140回程度、1MB(デフォルト)だと800回程度、4MBだと3000回程度で StackOverflowError
が発生した。16MBなら1万回は大丈夫だった。
一方で、 .transacted()
を消した場合は256KBでも問題なく処理できた。
(2021/07/21追記) Camel 3.8 を使った場合も問題なく処理できた。
スタックトレースの確認
camel の内部のソースコードを理解するのは難しいが、どういう動作でエラーが発生したか確認しておく。ループ中の StackOverflowError
なので、恐らく再帰的な呼び出しになっていると予想はつく。
実際、以下の部分が再帰になっていた。
Stack Trace | Source Code (3.2.0) |
---|---|
... | ... |
CamelInternalProcessor.process() | CamelInternalProcessor.java:286 |
MulticastProcessor$MulticastState.lambda$run$1() | MulticastProcessor.java:364 |
AsyncCompletionService$Task.run() | AsyncCompletionService.java:150 |
DefaultReactiveExecutor$Worker.schedule() | DefaultReactiveExecutor.java:148 |
DefaultReactiveExecutor.schedule() | DefaultReactiveExecutor.java:55 |
CamelInternalProcessor$AsyncAfterTask.done() | CamelInternalProcessor.java:186 |
TransactionErrorHandler.process() | TransactionErrorHandler.java:126 |
CamelInternalProcessor.process() | CamelInternalProcessor.java:286 |
.transacted()
を消した場合のスタックトレースも見てみたが、もっと根元に近い部分からメソッド呼び出しが変わっていて、うまく比較できなかった。
エラー回避策(欠点あり)
.split()
に使える設定を眺めていたところ、並列処理にしてループの度にスレッドを分ければいいのではないかと思いついた。スレッドプールの指定もできるので3、処理順序を保つ必要があるなら1スレッドだけ割り当てればいい。
.split(constant(new int[10000]))
+ .parallelProcessing()
+ .executorService(Executors.newSingleThreadExecutor())
+
.process(exchange -> {
スレッドを分けるオーバーヘッドのためか、処理は遅くなる。ループ内部の処理時間が十分長ければ気にしなくていいと思う。(未確認)
もっと重大な問題について次節で扱う。
DBの絡む例
「スレッドを別にした場合、DBのトランザクションは共有されるのか?」という不安が浮かんだので、もう少し複雑にしたrouteで実験する。
ループの外でDBレコードを作成し、中で取得する。同じトランザクションでないとこの新規レコードは見えない。
@Component
public class SampleRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("timer:sample?repeatCount=1")
.transacted()
// create a new task and get its ID
.to("sql:INSERT INTO task () VALUES ()")
.to("sql:SELECT LAST_INSERT_ID()?outputType=SelectOne")
.setHeader("task_id", body())
.setBody(constant(new int[10000]))
.split(body())
.parallelProcessing() // BAD!!
// get the task
.to("sql:SELECT * FROM task WHERE task_id = :#task_id?outputType=SelectOne")
// (debug)
.process(exchange -> {
int index = exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
if ((index & (index - 1)) == 0)
System.out.printf("index: %d, body: %s%n", index, exchange.getIn().getBody());
})
.end()
.log("transaction finished.")
;
}
}
これを実行すると、SELECT文でレコードを取得できていない(nullになっている)ことがわかる。 .parallelProcessing()
を消した場合はレコードを取得できる(が StackOverflowError になる)。
エラー回避策(第2案)
他にも色々と試していたところ、再帰処理は split()
が終わる(対応する end()
を抜ける)と完了することがわかった。これはループのネストは問題になりにくいことを示している4。ならば現在1重のループを敢えて多重に変形すると解決するかもしれない。
このためには、 split()
に渡すデータを分割しなければいけない。 ruby でいう #each_slice
みたいなことをしたいのだが、 camel に適切なものがあるか分からなかったので、類似のメソッド tokenize()
を参考に自作した。
@Component
public class SampleRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("timer:sample?repeatCount=1")
.transacted()
// create a new task and get its ID
.to("sql:INSERT INTO task () VALUES ()")
.to("sql:SELECT LAST_INSERT_ID()?outputType=SelectOne")
.setHeader("task_id", body())
.setBody(constant(new int[10000]))
.split(group(body(), 1000))
.split(group(body(), 100))
.split(body())
// get the task
.to("sql:SELECT * FROM task WHERE task_id = :#task_id?outputType=SelectOne")
// (debug)
.process(exchange -> {
int index = exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
if ((index & (index - 1)) == 0)
System.out.printf("index: %d, body: %s%n", index, exchange.getIn().getBody());
})
.end()
.end()
.end()
.log("transaction finished.")
;
}
// org.apache.camel.support.builder.*
public static ValueBuilder group(ValueBuilder builder, int group) {
return new ValueBuilder(ExpressionBuilder.groupIteratorExpression(
builder.getExpression(), null, "" + group, false
));
}
}
ループを 100 * 10 * 10 ≧ 10000 に分けたので、スタックの消費は 100 + 10 + 10 = 120 程度で済む。実際このコードはスタックサイズ256KB(元々は140回が限界)でもエラーにならなかった。
※ 3重なら3乗根が最もスタックを節約できる
この例ではループの中は修正していないが、 Exchange.SPLIT_INDEX
の値は一番内側のループのカウントなので、元と異なるログ出力になっている。他にもループを多重化したことへの対応が必要になるパターンは多いと思われる。
参考
- Camel のドキュメント
- Camel のソースコード
-
transacted()
とsplit()
の組み合わせに関する質問