0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Apache Camel × Spring Boot のExchange に潜む“隠しリトライ情報”の罠

Last updated at Posted at 2025-04-19

Apache Camel × Spring Boot で retry ロジックを実装していて、こんな経験ありませんか?

  • 明らかに初回処理のはずなのに、Camel が「リトライ済み」と判断して処理をスキップした
  • 別ルートで再送しようとしたら、前のリトライ情報が残っていて意図しない挙動になった
  • getHeaders()getProperties() では見えない情報が残っている気がする…

この現象、実は Camel Exchange の内部に隠されたリトライ情報が原因かもしれません。

■ 隠しプロパティ(内部プロパティ)のメリット

1.ユーザーの誤操作を防ぐ

  • headers はよく使うから、初心者でも気軽に触れる → だから安全でわかりやすいものに限定
  • properties はルート全体の制御に関わるので、「意図せず上書きしちゃった」みたいな事故を防ぐ

→ 例えば Exchange.REDELIVERY_COUNTER をユーザーが適当に書き換えると、Camel の retry 機構が壊れる

2.メッセージ(Message)と処理制御(Exchange)を分ける思想

  • Camel は「メッセージの内容は Message(headers & body)に、制御情報は Exchange に」っていう思想
  • これにより、Messageはシンプルに保ちつつ、Exchangeで高度な制御ができる

3.内部の状態管理を密結合させないため

  • Camel の機能(retry、routing slip、error handler など)が相互に影響しすぎないように、
    特定の用途のプロパティは特定の処理だけが使うようにしてある(→カプセル化の一種)

Exchange の構造と隠しプロパティ

Camel の Exchange は以下のような情報を持っています:

  • Headers:ユーザーが操作するメッセージ情報(exchange.getMessage().getHeaders()
  • Properties:ルート全体に関わる情報(exchange.getProperties()
  • UnitOfWork:Camel 内部で retry 状態や rollback 状態などを管理する低レイヤーコンテキスト
  • RedeliveryData(隠し)UnitOfWork 内部に隠されていて、retry 回数や遅延などが保持される

この RedeliveryData は標準の API からは見えないため、普通にログを出しても気づけません。


予期しない挙動の例

以下のようなケースで、Camel の retry 状態が意図しない動作を引き起こすことがあります:

  • direct: で再送ルートに移動したら、すでに retry カウントが進んでいる状態で扱われた
  • retry ルート内で maximumRedeliveries=0 としているのに、スキップされた

原因は、前のルートの Exchange が持つ RedeliveryData が生きているからです。


Exchange の内部状態をすべて可視化する Processor

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.spi.UnitOfWork;
import org.springframework.stereotype.Component;

import java.lang.reflect.Field;

@Component("logExchangeInternals")
public class LogExchangeInternalsProcessor implements Processor {

    @Override
    public void process(Exchange exchange) throws Exception {
        System.out.println("\n========== [Camel Exchange Debug Start] ==========");

        // --- Headers ---
        Message message = exchange.getMessage();
        System.out.println("-- Headers --");
        message.getHeaders().forEach((k, v) -> System.out.printf("[Header] %s = %s%n", k, v));

        // --- Properties ---
        System.out.println("-- Properties --");
        exchange.getProperties().forEach((k, v) -> System.out.printf("[Property] %s = %s%n", k, v));

        // --- Exception 情報 ---
        Throwable ex = exchange.getException();
        if (ex != null) {
            System.out.printf("[Exception] %s: %s%n", ex.getClass().getName(), ex.getMessage());
        } else {
            System.out.println("[Exception] None");
        }

        // --- UnitOfWork + RedeliveryData(内部状態) ---
        UnitOfWork uow = exchange.getUnitOfWork();
        if (uow != null) {
            System.out.printf("[UoW] ID = %s%n", uow.getId());
            System.out.printf("[UoW] RollbackOnly = %s%n", uow.isRollbackOnly());
            System.out.printf("[UoW] Transacted = %s%n", uow.isTransacted());

            try {
                Field redeliveryDataField = uow.getClass().getDeclaredField("redeliveryData");
                redeliveryDataField.setAccessible(true);
                Object redeliveryData = redeliveryDataField.get(uow);

                if (redeliveryData != null) {
                    System.out.println("-- RedeliveryData (via reflection) --");
                    for (Field field : redeliveryData.getClass().getDeclaredFields()) {
                        field.setAccessible(true);
                        Object val = field.get(redeliveryData);
                        System.out.printf("  - %s = %s%n", field.getName(), val);
                    }
                } else {
                    System.out.println("[RedeliveryData] is null");
                }
            } catch (Exception e) {
                System.out.printf("[RedeliveryData] Not accessible: %s%n", e.getMessage());
            }
        } else {
            System.out.println("[UoW] Not available");
        }

        System.out.println("========== [Camel Exchange Debug End] ==========\n");
    }
}

出力例

========== [Camel Exchange Debug Start] ==========
-- Headers --
[Header] CamelRedelivered = true
[Header] breadcrumbId = ID-myhost-12345
-- Properties --
[Property] CamelRedeliveryCounter = 2
[Property] CamelRedeliveryDelay = 1000
-- Exception --
[Exception] java.net.SocketTimeoutException: Read timed out
-- UnitOfWork --
[UoW] ID = a1b2c3d4
[UoW] RollbackOnly = false
[UoW] Transacted = false
-- RedeliveryData (via reflection) --
  - redeliveryCounter = 2
  - redeliveryDelay = 1000
  - redelivered = true
========== [Camel Exchange Debug End] ==========

対処法:Exchange に残ったリトライ状態をリセットする方法

Camel の Exchange には、リトライ情報(RedeliveryDataCamelRedeliveryCounter など)が内部に保持されており、
別ルートに渡ったときや再送処理時に意図しない挙動を引き起こすことがあります。

以下のいずれかの方法で リトライ状態をリセットできます。


方法①:Exchange を新しく作り直す(安全・確実)

CamelContext を使って新しい Exchange を生成し、必要な情報だけ引き継ぎます。

Exchange newExchange = new DefaultExchange(camelContext);

// 必要な内容をコピー
newExchange.getMessage().setBody(oldExchange.getMessage().getBody());
newExchange.getMessage().setHeaders(Map.of(
    "X-TraceId", oldExchange.getMessage().getHeader("X-TraceId")
));

// → この newExchange を direct/seda などで次のルートに渡す

メリット:
• Camel 内部の RedeliveryData も含めて完全に初期化される

デメリット:
• メモリ使用量が少し増える
• コピー処理の実装が必要

方法②:Exchange のヘッダーとプロパティをクリアして必要なものだけ残す(軽量)

Camel の retry 判定は、Exchange.getProperties() に含まれる CamelRedeliveryCounterCamelRedelivered といった情報を参照しています。
これらを明示的に削除し、必要なヘッダーのみ復元することで「初回扱いに近い状態」にできます。

@Component("cleanExchangeProcessor")
public class CleanExchangeProcessor implements Processor {

    @Override
    public void process(Exchange exchange) throws Exception {
        Message message = exchange.getMessage();

        // 必要なヘッダーだけ退避(例:独自のトレースIDなど)
        Object traceId = message.getHeader("X-TraceId");

        // ヘッダーとプロパティをすべて削除
        message.getHeaders().clear();
        exchange.getProperties().clear();

        // 必要なヘッダーだけ復元
        if (traceId != null) {
            message.setHeader("X-TraceId", traceId);
        }
    }
}

この Processor をルートに挿入することで、以下のような効果があります:
• Camel がリトライ済みと判定するヘッダーやプロパティをクリア
• ユーザー定義のヘッダー(トレースIDなど)だけ保持
• Exchange 自体は新しく作らないため、メモリ効率が高い

メリット:
• 軽量・高速で実装もシンプル
• retry 状態を“見た目”初期化できる
• Camel の挙動に不整合が起きにくい

デメリット:
• Exchange の内部構造(UnitOfWork.redeliveryData)はそのままなので、
Camel の一部ロジックでは retry 済みと判定される可能性がある

方法③:seda: 経由でルートを分離する(非同期コピーで retry 状態を初期化)

Camel の seda: コンポーネントは、非同期キューを使って Exchange を別スレッドに渡す仕組みです。
このとき、Exchange が自動的にコピーされるため、リトライ関連の内部状態(RedeliveryData など)も初期化されます。

ルートの途中で direct: の代わりに seda: を使うだけで、Camel 側で retry 状態をリセットしてくれる効果があります。

サンプル構成(XML DSL)

<!-- メインルート:再送が必要になったら seda:retryRoute に渡す -->
<route id="main-route">
  <from uri="direct:start" />
  <to uri="log:beforeRetry" />
  <to uri="seda:retryRoute"/>
</route>

<!-- 再送ルート:seda 経由で受け取った Exchange はコピーされている -->
<route id="retry-route">
  <from uri="seda:retryRoute"/>
  <to uri="log:afterRetry"/>
</route>

この構成では、seda:retryRoute に渡した時点で Exchange がコピーされるため:
• UnitOfWork 内部の RedeliveryData が初期化された新しい Exchange が使われる
• retry カウントやフラグが一新され、Camel の挙動が「初回処理」として動く

メリット:
• Camel が自動的に Exchange をコピーしてくれる
• retry 状態を意識せずルート構成だけでリセットが可能
• 実装がシンプルで安全性が高い

デメリット:
• seda: は非同期キューを使うため、即時の順序性や同期制御が必要な場面では注意
• Exchange のコピーにはコストがあるため、大量高速処理ではチューニングが必要

補足:

seda: の代わりに vm:(同一JVM内)や direct:, direct-vm: などを使うと Exchange はコピーされず、そのまま渡されるため retry 状態も残る ことに注意してください。

方法④:リフレクションで RedeliveryData を強制的に null にする(非推奨)

Camel の Exchange は、リトライ情報を UnitOfWork の内部フィールドである redeliveryData に保持しています。
この情報は通常の getProperties()getHeaders() ではアクセスできず、Camel の retry ロジック内部でのみ使われます。

どうしても retry 状態を完全に強制的に初期化したい場合は、
Java のリフレクションを使って redeliveryDatanull にすることが可能です。

実装例(Java)

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.UnitOfWork;
import org.springframework.stereotype.Component;

import java.lang.reflect.Field;

@Component("forceClearRedeliveryProcessor")
public class ForceClearRedeliveryProcessor implements Processor {

    @Override
    public void process(Exchange exchange) throws Exception {
        UnitOfWork uow = exchange.getUnitOfWork();
        if (uow != null) {
            try {
                Field field = uow.getClass().getDeclaredField("redeliveryData");
                field.setAccessible(true);
                field.set(uow, null);
                System.out.println("[RedeliveryData] forcibly cleared.");
            } catch (NoSuchFieldException | IllegalAccessException e) {
                System.err.println("[RedeliveryData] could not be cleared: " + e.getMessage());
            }
        }
    }
}

この Processor をルートの途中に差し込むことで、Camel の retry 状態は内部的に完全リセットされます。

メリット:
• Exchange 自体を再生成せずに Camel の内部 retry 状態を完全にリセットできる
• ヘッダーやプロパティの復元も不要

デメリット:
• Camel のバージョンアップでフィールド名が変わると壊れる可能性が高い
• 保守性が低く、本番環境での使用は 非推奨
• JDK のセキュリティ設定次第でリフレクションが制限される可能性がある

注意

この方法はあくまで デバッグ・テスト・検証用途とし、
本番環境で使う場合は **方法①(Exchange 新規作成)や方法③(seda 経由)**を推奨します。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?