Apache camel を使って Kafka バッチ受信処理を実装する際、Exchangeの構造とスコープの切り替わりを理解することは非常に重要です。
この記事では、Kafka からメッセージを バッチで受信した場合の Exchange の構造と、それを <split>
で分割したあとの挙動について、詳しく整理し、Camel におけるスコープの切り替わりの理解を深めます。
1. Kafka バッチ受信時の Exchange 構造
Kafka から 5 件のメッセージをバッチで受信した場合、Camel の Exchange は以下のような構造になります。
Exchange(親 Exchange)
├── Properties:
│ └── (なし)
├── Headers:
│ ├── kafka.TOPIC : “your-topic”
│ ├── kafka.PARTITION : -1
│ ├── kafka.OFFSET : -1
│ └── その他 Kafka 全体に関する情報(基本使わない)
└── Body:
└── List
├── Exchange[0]
│ ├── Headers:
│ │ ├── kafka.TOPIC : “your-topic”
│ │ ├── kafka.OFFSET : 100
│ │ └── …
│ └── Body: “message-0”
├── Exchange[1]
│ └── …
├── Exchange[2]
│ └── …
├── Exchange[3]
│ └── …
└── Exchange[4]
└── …
各 Kafka レコードは独立した Exchange として List<Exchange>
に格納されており、それが親 Exchange の body にセットされます。
この時点ではまだ <split>
は適用されていません。
2. <split>
を使うとどうなるか?
Kafka バッチ受信後の Exchange に対して <split>
を使うと、Camel は body(List<Exchange>
)から 1 件ずつ Kafka レコードの Exchange を取り出し、個別に処理を行います。
<split>
<simple>${body}</simple>
...
</split>
このときの Exchange 構造は以下のように変化します:
Exchange(split後の子 Exchange)
├── Properties:
│ ├── CamelSplitIndex : 0〜4
│ ├── CamelSplitSize : 5
│ └── CamelSplitComplete : true(最後のみ)/ false(それ以外)
├── Headers:
│ └── 親 Exchange のヘッダーをコピー(Kafka 個別のヘッダーではない)
└── Body:
└── Exchange[i](Kafka 1件分の Exchange オブジェクト)
├── Headers:
│ ├── kafka.TOPIC : "your-topic"
│ ├── kafka.OFFSET : 100 + i
│ └── ...
└── Body: "message-{i}"
この構造が示すように、split後の Exchange の body には、Kafka レコード1件分の Exchange オブジェクトがそのまま格納されています。
つまり、実際のメッセージ本文(value)や Kafka のヘッダーを使いたい場合は、body の中の Exchange からさらに取り出す必要があります。
3. スコープが切り替わるとは?
Camel における <split>
の処理中は、ループ内のスコープ(文脈)における Exchange が切り替わります。この「スコープの切り替え」が Camel Kafka バッチ処理で混乱しやすいポイントの1つです。
スコープの違い
スコープ |
${body} の正体 |
Kafkaメッセージ本文の取得例 |
---|---|---|
split前(親) |
List<Exchange> (Kafkaバッチ) |
使用しない(split に渡すだけ) |
split中(子) |
Exchange[i] (Kafka 1件分の Exchange) |
${body.message.body} |
split中のヘッダー | 親 Exchange のヘッダー | Kafka の個別ヘッダーは ${body.message.headers[...]}
|
重要なポイント
- split後の Exchange の body は Exchange オブジェクト(Kafka レコード1件分)である
- その中にある
message.body
が Kafka の value(本文) - Kafka のヘッダー情報も
message.headers[...]
の中にある - 親 Exchange のヘッダーは継承されるが、KafkaRecord のメタ情報は含まれない
よくある勘違いと実際の挙動
誤解 | 実際 |
---|---|
${body} で Kafka メッセージ本体が取れる |
Exchange[i] なので、さらに .message.body で取り出す必要がある |
${header.kafka.TOPIC} で Kafka トピックが取れる |
親 Exchange のヘッダーしか参照できず、Kafka ヘッダーは見えない |
Kafka の OFFSET が null になる |
${body.message.headers[kafka.OFFSET]} でアクセスする必要がある |
このように、split後のスコープでは Exchange の構造が1段深くなることを常に意識しておくと、Kafka バッチ処理時のトラブルを回避できます。
4. Java Processorクラスでの処理例
Kafka レコード 1 件分の Exchange から メッセージ本体(value) と Kafka ヘッダー情報(topic、offset など) を取り出して処理したい場合、Camel の Processor
クラスを使うと柔軟に対応できます。
以下は、split 処理中に使用する Java Processor の実装例です。
Processor 実装クラス
@Component
public class KafkaSplitProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
// split によって生成された Exchange の body には Exchange[i] が入っている
Exchange inner = exchange.getMessage().getBody(Exchange.class);
// Kafka メッセージ本体(value)
String payload = inner.getMessage().getBody(String.class);
// Kafka ヘッダー(必要に応じて取得)
String topic = inner.getMessage().getHeader("kafka.TOPIC", String.class);
Long offset = inner.getMessage().getHeader("kafka.OFFSET", Long.class);
// 外側(ルート上の)Exchange に反映
exchange.getMessage().setBody(payload);
exchange.getMessage().setHeader("kafka.TOPIC", topic);
exchange.getMessage().setHeader("kafka.OFFSET", offset);
}
}
このように Processor を使うことで、KafkaRecord 単位のメタ情報や本文の抽出・加工・再設定が柔軟に行えます。
また、共通化して複数のルートで使い回すことも簡単にできます。
5. XML DSLでの処理例
Java DSL を使わない場合でも、Camel の XML DSL を用いて Kafka レコード1件ごとの本文やヘッダーを取り出すことが可能です。
Kafka バッチ受信後の <split>
の中で、Exchange[i] の中にある本文(Kafka value)や Kafka ヘッダー情報を取り出して使うには、以下のように記述します。
サンプル:Kafka value とトピックを取り出してログ出力
<route id="kafka-split-xml">
<from uri="kafka:your-topic?...&batching=true"/>
<split>
<simple>${body}</simple> <!-- Exchange[i] を1件ずつ取り出す -->
<!-- Kafka メッセージ本体を取り出して再セット -->
<setBody>
<simple>${body.message.body}</simple>
</setBody>
<!-- Kafka のトピック名を取り出してログ出力用に設定 -->
<setHeader name="kafka.TOPIC">
<simple>${body.message.headers[kafka.TOPIC]}</simple>
</setHeader>
<to uri="log:split-message?showHeaders=true"/>
</split>
</route>
6. よくある落とし穴
Kafka バッチ受信と <split>
を使った実装では、Exchange の構造とスコープの理解が不十分なまま処理を書くと、想定外の挙動になりがちです。
以下は特によくある落とし穴とその実際の挙動です。
誤解・ミス | 実際の挙動・対策 |
---|---|
${body} で Kafka の本文(value)が取得できると思っていた |
実際の ${body} は Kafka 1件分の Exchange オブジェクト。本文を取得するには ${body.message.body} が必要 |
${header.kafka.TOPIC} でトピック名が取得できると思っていた |
split後の Exchange のヘッダーは 親 Exchange からコピーされたもので、Kafka レコード個別のヘッダーは含まれていない。${body.message.headers[kafka.TOPIC]} を使う必要がある |
Kafka の OFFSET や KEY が null になってしまう |
それらも ${body.message.headers[...]} でアクセスする必要がある。${header.kafka.OFFSET} では正しく取得できない |
split 内で log で body を出力したら DefaultExchange@xxxxxx と表示された |
split後の body は Exchange オブジェクトなので、直接出力すると toString() 結果が表示されてしまう。${body.message.body} を使えば中身が見える |
Kafka の value を処理したつもりが空だった |
Exchange[i] を処理していることに気づかず、body の中の .message.body を参照しなかった |