0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Apache Camel × Kafka バッチ受信と <split> の挙動を完全理解する

Last updated at Posted at 2025-04-17

Apache camel を使って Kafka バッチ受信処理を実装する際、Exchangeの構造とスコープの切り替わりを理解することは非常に重要です。

この記事では、Kafka からメッセージを バッチで受信した場合の Exchange の構造と、それを <split> で分割したあとの挙動について、詳しく整理し、Camel におけるスコープの切り替わりの理解を深めます。

1. Kafka バッチ受信時の Exchange 構造

Kafka から 5 件のメッセージをバッチで受信した場合、Camel の Exchange は以下のような構造になります。

Exchange(親 Exchange)
├── Properties:
│   └── (なし)
├── Headers:
│   ├── kafka.TOPIC       : “your-topic”
│   ├── kafka.PARTITION   : -1
│   ├── kafka.OFFSET      : -1
│   └── その他 Kafka 全体に関する情報(基本使わない)
└── Body:
└── List
├── Exchange[0]
│   ├── Headers:
│   │   ├── kafka.TOPIC     : “your-topic”
│   │   ├── kafka.OFFSET    : 100
│   │   └── …
│   └── Body: “message-0”
├── Exchange[1]
│   └── …
├── Exchange[2]
│   └── …
├── Exchange[3]
│   └── …
└── Exchange[4]
└── …

各 Kafka レコードは独立した Exchange として List<Exchange> に格納されており、それが親 Exchange の body にセットされます。
この時点ではまだ <split> は適用されていません。

2. <split> を使うとどうなるか?

Kafka バッチ受信後の Exchange に対して <split> を使うと、Camel は body(List<Exchange>)から 1 件ずつ Kafka レコードの Exchange を取り出し、個別に処理を行います。

<split>
  <simple>${body}</simple>
  ...
</split>

このときの Exchange 構造は以下のように変化します:

Exchange(split後の子 Exchange)
├── Properties:
│   ├── CamelSplitIndex    : 0〜4
│   ├── CamelSplitSize     : 5
│   └── CamelSplitComplete : true(最後のみ)/ false(それ以外)
├── Headers:
│   └── 親 Exchange のヘッダーをコピー(Kafka 個別のヘッダーではない)
└── Body:
    └── Exchange[i](Kafka 1件分の Exchange オブジェクト)
        ├── Headers:
        │   ├── kafka.TOPIC       : "your-topic"
        │   ├── kafka.OFFSET      : 100 + i
        │   └── ...
        └── Body: "message-{i}"

この構造が示すように、split後の Exchange の body には、Kafka レコード1件分の Exchange オブジェクトがそのまま格納されています。
つまり、実際のメッセージ本文(value)や Kafka のヘッダーを使いたい場合は、body の中の Exchange からさらに取り出す必要があります。

3. スコープが切り替わるとは?

Camel における <split> の処理中は、ループ内のスコープ(文脈)における Exchange が切り替わります。この「スコープの切り替え」が Camel Kafka バッチ処理で混乱しやすいポイントの1つです。

スコープの違い

スコープ ${body} の正体 Kafkaメッセージ本文の取得例
split前(親) List<Exchange>(Kafkaバッチ) 使用しない(split に渡すだけ)
split中(子) Exchange[i](Kafka 1件分の Exchange) ${body.message.body}
split中のヘッダー 親 Exchange のヘッダー Kafka の個別ヘッダーは ${body.message.headers[...]}

重要なポイント

  • split後の Exchange の body は Exchange オブジェクト(Kafka レコード1件分)である
  • その中にある message.body が Kafka の value(本文)
  • Kafka のヘッダー情報も message.headers[...] の中にある
  • 親 Exchange のヘッダーは継承されるが、KafkaRecord のメタ情報は含まれない

よくある勘違いと実際の挙動

誤解 実際
${body} で Kafka メッセージ本体が取れる Exchange[i] なので、さらに .message.body で取り出す必要がある
${header.kafka.TOPIC} で Kafka トピックが取れる 親 Exchange のヘッダーしか参照できず、Kafka ヘッダーは見えない
Kafka の OFFSET が null になる ${body.message.headers[kafka.OFFSET]} でアクセスする必要がある

このように、split後のスコープでは Exchange の構造が1段深くなることを常に意識しておくと、Kafka バッチ処理時のトラブルを回避できます。

4. Java Processorクラスでの処理例

Kafka レコード 1 件分の Exchange から メッセージ本体(value)Kafka ヘッダー情報(topic、offset など) を取り出して処理したい場合、Camel の Processor クラスを使うと柔軟に対応できます。

以下は、split 処理中に使用する Java Processor の実装例です。

Processor 実装クラス

@Component
public class KafkaSplitProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        // split によって生成された Exchange の body には Exchange[i] が入っている
        Exchange inner = exchange.getMessage().getBody(Exchange.class);

        // Kafka メッセージ本体(value)
        String payload = inner.getMessage().getBody(String.class);

        // Kafka ヘッダー(必要に応じて取得)
        String topic = inner.getMessage().getHeader("kafka.TOPIC", String.class);
        Long offset = inner.getMessage().getHeader("kafka.OFFSET", Long.class);

        // 外側(ルート上の)Exchange に反映
        exchange.getMessage().setBody(payload);
        exchange.getMessage().setHeader("kafka.TOPIC", topic);
        exchange.getMessage().setHeader("kafka.OFFSET", offset);
    }
}

このように Processor を使うことで、KafkaRecord 単位のメタ情報や本文の抽出・加工・再設定が柔軟に行えます。

また、共通化して複数のルートで使い回すことも簡単にできます。

5. XML DSLでの処理例

Java DSL を使わない場合でも、Camel の XML DSL を用いて Kafka レコード1件ごとの本文やヘッダーを取り出すことが可能です。

Kafka バッチ受信後の <split> の中で、Exchange[i] の中にある本文(Kafka value)や Kafka ヘッダー情報を取り出して使うには、以下のように記述します。

サンプル:Kafka value とトピックを取り出してログ出力

<route id="kafka-split-xml">
  <from uri="kafka:your-topic?...&batching=true"/>

  <split>
    <simple>${body}</simple> <!-- Exchange[i] を1件ずつ取り出す -->

    <!-- Kafka メッセージ本体を取り出して再セット -->
    <setBody>
      <simple>${body.message.body}</simple>
    </setBody>

    <!-- Kafka のトピック名を取り出してログ出力用に設定 -->
    <setHeader name="kafka.TOPIC">
      <simple>${body.message.headers[kafka.TOPIC]}</simple>
    </setHeader>

    <to uri="log:split-message?showHeaders=true"/>
  </split>
</route>

6. よくある落とし穴

Kafka バッチ受信と <split> を使った実装では、Exchange の構造とスコープの理解が不十分なまま処理を書くと、想定外の挙動になりがちです。

以下は特によくある落とし穴とその実際の挙動です。

誤解・ミス 実際の挙動・対策
${body} で Kafka の本文(value)が取得できると思っていた 実際の ${body} は Kafka 1件分の Exchange オブジェクト。本文を取得するには ${body.message.body} が必要
${header.kafka.TOPIC} でトピック名が取得できると思っていた split後の Exchange のヘッダーは 親 Exchange からコピーされたもので、Kafka レコード個別のヘッダーは含まれていない${body.message.headers[kafka.TOPIC]} を使う必要がある
Kafka の OFFSET や KEY が null になってしまう それらも ${body.message.headers[...]} でアクセスする必要がある。${header.kafka.OFFSET} では正しく取得できない
split 内で log で body を出力したら DefaultExchange@xxxxxx と表示された split後の body は Exchange オブジェクトなので、直接出力すると toString() 結果が表示されてしまう。${body.message.body} を使えば中身が見える
Kafka の value を処理したつもりが空だった Exchange[i] を処理していることに気づかず、body の中の .message.body を参照しなかった

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?