LoginSignup
0
0

More than 1 year has passed since last update.

Apache Camel で `transacted()` と `split()` を組み合わせた際に StackOverflowError

Last updated at Posted at 2021-01-28

(2021/07/21追記)本問題は Camel 3.8 で修正された。


概要

Apache Camel でDBから読み取ったレコードをループ処理したところ、1000件近いデータは処理できず StackOverflowError が発生した。

調べていくと、 split() でループさせるだけなら問題ないが、それが transacted() より後である場合に発生することがわかった。

うまい解決ができていないが、再現と回避のために試したことをまとめておく。

回避策の案

  • JVMのスタックサイズを増やす
    • お手軽、コード変更は不要
    • データが増えれば結局 StackOverflowError が発生する
  • ループ内を別スレッドで処理する
    • 変更は1〜2行で済む
    • トランザクションから外れるので、DBを読み書きしているような処理では採用できない
  • split() を多重にして、一段あたりのループ回数を抑える
    • ループ内の処理も合わせて修正する必要がある
    • データが増えれば結局 StackOverflowError が発生する
      (とはいえ1重で800回だったのなら、3重で1500万回のループには耐えられる見込み1

テスト環境

以前に勉強用に作ったサンプルを流用したため、環境も同じ。
https://qiita.com/HMMNRST/items/da89ce94c203633a8fb3

  • Java: 1.8.0_191
  • Maven: 3.6.3
  • Camel: 3.2.0
    • Spring: 5.2.5.RELEASE
    • Spring Boot: 2.2.6.RELEASE
  • MySQL: 5.7.30

単純な再現例

長さ1万の配列をループ処理するだけのrouteを作って実験してみる。

以下のコードでは、進捗を見れるようにループの中で時々インデックスを出力している2

src/main/java/org/example/mycamelapp/routes/SampleRoute.java
@Component
public class SampleRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("timer:sample?repeatCount=1")
                .transacted()

                .split(constant(new int[10000]))
                    .process(exchange -> {
                        int index = exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
                        if ((index & (index - 1)) == 0) System.out.println(index);
                    })
                .end()

                .log("transaction finished.")
        ;
    }
}

JVMのスタックサイズを指定して実行する。( -Xss オプション)

terminal
cd path/to/app

# create JAR file
mvn clean package spring-boot:repackage

# run (send SIGINT after 20 seconds)
timeout -sINT 20s \
    java -Xss256k -jar target/mycamelapp-1.0-SNAPSHOT.jar

手元の環境では、スタックサイズ256KBだとループ140回程度、1MB(デフォルト)だと800回程度、4MBだと3000回程度で StackOverflowError が発生した。16MBなら1万回は大丈夫だった。

一方で、 .transacted() を消した場合は256KBでも問題なく処理できた。

(2021/07/21追記) Camel 3.8 を使った場合も問題なく処理できた。

スタックトレースの確認

camel の内部のソースコードを理解するのは難しいが、どういう動作でエラーが発生したか確認しておく。ループ中の StackOverflowError なので、恐らく再帰的な呼び出しになっていると予想はつく。

実際、以下の部分が再帰になっていた。

Stack Trace Source Code (3.2.0)
... ...
CamelInternalProcessor.process() CamelInternalProcessor.java:286
MulticastProcessor\$MulticastState.lambda\$run\$1() MulticastProcessor.java:364
AsyncCompletionService\$Task.run() AsyncCompletionService.java:150
DefaultReactiveExecutor\$Worker.schedule() DefaultReactiveExecutor.java:148
DefaultReactiveExecutor.schedule() DefaultReactiveExecutor.java:55
CamelInternalProcessor\$AsyncAfterTask.done() CamelInternalProcessor.java:186
TransactionErrorHandler.process() TransactionErrorHandler.java:126
CamelInternalProcessor.process() CamelInternalProcessor.java:286

.transacted() を消した場合のスタックトレースも見てみたが、もっと根元に近い部分からメソッド呼び出しが変わっていて、うまく比較できなかった。

エラー回避策(欠点あり)

.split() に使える設定を眺めていたところ、並列処理にしてループの度にスレッドを分ければいいのではないかと思いついた。スレッドプールの指定もできるので3、処理順序を保つ必要があるなら1スレッドだけ割り当てればいい。

diff
                .split(constant(new int[10000]))
+                   .parallelProcessing()
+                   .executorService(Executors.newSingleThreadExecutor())
+
                    .process(exchange -> {

スレッドを分けるオーバーヘッドのためか、処理は遅くなる。ループ内部の処理時間が十分長ければ気にしなくていいと思う。(未確認)

もっと重大な問題について次節で扱う。

DBの絡む例

「スレッドを別にした場合、DBのトランザクションは共有されるのか?」という不安が浮かんだので、もう少し複雑にしたrouteで実験する。

ループの外でDBレコードを作成し、中で取得する。同じトランザクションでないとこの新規レコードは見えない。

src/main/java/org/example/mycamelapp/routes/SampleRoute.java
@Component
public class SampleRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("timer:sample?repeatCount=1")
                .transacted()

                // create a new task and get its ID
                .to("sql:INSERT INTO task () VALUES ()")
                .to("sql:SELECT LAST_INSERT_ID()?outputType=SelectOne")
                .setHeader("task_id", body())

                .setBody(constant(new int[10000]))
                .split(body())
                    .parallelProcessing() // BAD!!

                    // get the task
                    .to("sql:SELECT * FROM task WHERE task_id = :#task_id?outputType=SelectOne")

                    // (debug)
                    .process(exchange -> {
                        int index = exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
                        if ((index & (index - 1)) == 0)
                            System.out.printf("index: %d, body: %s%n", index, exchange.getIn().getBody());
                    })
                .end()

                .log("transaction finished.")
        ;
    }
}

これを実行すると、SELECT文でレコードを取得できていない(nullになっている)ことがわかる。 .parallelProcessing() を消した場合はレコードを取得できる(が StackOverflowError になる)。

エラー回避策(第2案)

他にも色々と試していたところ、再帰処理は split() が終わる(対応する end() を抜ける)と完了することがわかった。これはループのネストは問題になりにくいことを示している4。ならば現在1重のループを敢えて多重に変形すると解決するかもしれない。

このためには、 split() に渡すデータを分割しなければいけない。 ruby でいう #each_slice みたいなことをしたいのだが、 camel に適切なものがあるか分からなかったので、類似のメソッド tokenize() を参考に自作した。

src/main/java/org/example/mycamelapp/routes/SampleRoute.java
@Component
public class SampleRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("timer:sample?repeatCount=1")
                .transacted()

                // create a new task and get its ID
                .to("sql:INSERT INTO task () VALUES ()")
                .to("sql:SELECT LAST_INSERT_ID()?outputType=SelectOne")
                .setHeader("task_id", body())

                .setBody(constant(new int[10000]))
                .split(group(body(), 1000))
                .split(group(body(), 100))
                .split(body())

                    // get the task
                    .to("sql:SELECT * FROM task WHERE task_id = :#task_id?outputType=SelectOne")

                    // (debug)
                    .process(exchange -> {
                        int index = exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
                        if ((index & (index - 1)) == 0)
                            System.out.printf("index: %d, body: %s%n", index, exchange.getIn().getBody());
                    })
                .end()
                .end()
                .end()

                .log("transaction finished.")
        ;
    }

    // org.apache.camel.support.builder.*
    public static ValueBuilder group(ValueBuilder builder, int group) {
        return new ValueBuilder(ExpressionBuilder.groupIteratorExpression(
                builder.getExpression(), null, "" + group, false
        ));
    }
}

ループを 100 * 10 * 10 ≧ 10000 に分けたので、スタックの消費は 100 + 10 + 10 = 120 程度で済む。実際このコードはスタックサイズ256KB(元々は140回が限界)でもエラーにならなかった。
※ 3重なら3乗根が最もスタックを節約できる

この例ではループの中は修正していないが、 Exchange.SPLIT_INDEX の値は一番内側のループのカウントなので、元と異なるログ出力になっている。他にもループを多重化したことへの対応が必要になるパターンは多いと思われる。

参考


  1. 各段250回とすれば、全体のループは 250 ** 3 = 15625000回、スタック消費は 250 * 3 = 750 ( ≦ 800 )。 

  2. IDEで .process() のラムダ式内にブレークポイントを設置すれば、メッセージ処理を一時停止してスタックトレースの確認もできる。 

  3. スレッドプールを指定した場合は自動的に .parallelProcessing() もONになるので、両方指定する必要は無い。 

  4. 各ループ回数の掛け算でなく足し算で StackOverflowError になるかどうか決まるということ。 

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0