0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Apache Camel で Kafka メッセージをバッチ受信し、split 処理後に手動コミットする(XML DSL / Camel 4)

Last updated at Posted at 2025-04-16

はじめに

Apache Camel を使って Kafka メッセージをバッチで受信し、
1件ずつ処理してから最後に手動でオフセットをコミットする実装方法を紹介します。

  • Apache Camel 4.x(オープンソース版)
  • Spring Boot 3.x
  • XML DSL
  • camel-kafka
  • 手動コミット(manual commit)

やりたいこと

  • Kafka から 最大 500 件のバッチ受信
  • split で 1 件ずつ処理(標準出力へログ出力)
  • 全件処理完了後に手動で Kafka offset を commit

プロジェクト構成

  • Spring Boot + Camel(XML DSL)
  • Kafka (localhost:9092)
  • Kafka 設定は application.properties に集中管理

1. application.properties

Kafka 関連の設定はすべて application.properties に定義しておくことで、Camel の XML DSL 側はスッキリ書けます。

camel.component.kafka.brokers=localhost:9092
camel.component.kafka.group-id=my-group
camel.component.kafka.auto-offset-reset=latest

camel.component.kafka.max-poll-records=500
camel.component.kafka.consumer-request-timeout-ms=3000
camel.component.kafka.poll-timeout-ms=3000

camel.component.kafka.auto-commit-enable=false
camel.component.kafka.allow-manual-commit=true

2. camel-context.xml

Kafka からバッチでメッセージを受信し、split で 1 件ずつ処理して、
最後に manualCommitProcessor でオフセットを手動コミットします。

<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="kafka-batch-consume-route">
        <!-- Kafkaトピックからメッセージを受信 -->
        <from uri="kafka:my-topic" />

        <!-- メッセージのバッチを分割して個別に処理 -->
        <split streaming="true">
            <simple>${body}</simple>
            <process ref="messageProcessor"/>
        </split>

        <!-- 全メッセージ処理後に手動でオフセットをコミット -->
        <process ref="manualCommitProcessor"/>
    </route>
</routes>

3. MessageProcessor.java

Kafka から受信したバッチメッセージを split EIP によって分割し、各メッセージを個別に処理するための Processor 実装です。

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.springframework.stereotype.Component;

@Component("messageProcessor")
public class MessageProcessor implements Processor {

    @Override
    public void process(Exchange exchange) {
        // 各メッセージの本体を取得
        Object body = exchange.getMessage().getBody();
        // メッセージの処理をここに実装
        System.out.println("Processing message: " + body);
    }
}

4. ManualCommitProcessor.java

KafkaManualCommit を使って Kafka の offset を明示的に commit する Processor を実装します。

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaManualCommit;
import org.springframework.stereotype.Component;

@Component("manualCommitProcessor")
public class ManualCommitProcessor implements Processor {

    @Override
    public void process(Exchange exchange) {
        KafkaManualCommit manualCommit = exchange.getMessage().getHeader(
            KafkaManualCommit.MANUAL_COMMIT, KafkaManualCommit.class);

        if (manualCommit != null) {
            manualCommit.commit();
            System.out.println("Kafka offset committed manually.");
        } else {
            System.out.println("ManualCommit not available.");
        }
    }
}

5. pom.xml の依存関係(Apache Camel OSS)

Apache Camel を使用する場合の Maven 依存関係は以下の通りです。

<dependencies>
    <!-- Apache Camel Core + Spring Boot Starter -->
    <dependency>
        <groupId>org.apache.camel.springboot</groupId>
        <artifactId>camel-spring-boot-starter</artifactId>
        <version>4.4.0</version> <!-- 最新の安定バージョンを指定 -->
    </dependency>

    <!-- Apache Camel Kafka コンポーネント -->
    <dependency>
        <groupId>org.apache.camel.springboot</groupId>
        <artifactId>camel-kafka-starter</artifactId>
        <version>4.4.0</version>
    </dependency>
</dependencies>

6. 起動クラス(KafkaBatchApp.java

Spring Boot アプリケーションのエントリーポイントとなるクラスです。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaBatchApp {
    public static void main(String[] args) {
        SpringApplication.run(KafkaBatchApp.class, args);
    }
}

7. 実行結果(ログ出力例)

Kafka に複数件のメッセージを送信すると、以下のようなログが出力されます:

Received message: {“event”:“test1”}
Received message: {“event”:“test2”}

Kafka offset committed manually.

解説

  • Received message: ...
    各メッセージが split によって個別に処理され、ログに出力されています。

  • Kafka offset committed manually.
    全てのメッセージ処理が完了した後、manualCommitProcessor によって Kafka のオフセットが手動でコミットされたことを示しています。

このように、Camel のルートは Kafka からのバッチメッセージを効率的に処理し、確実にオフセットを管理することができます。

8. まとめ

本記事では、Apache Camel を使用して、Kafka からのメッセージをバッチで受信し、各メッセージを個別に処理した後、手動でオフセットをコミットする方法を紹介しました。

主なポイント

  • バッチ受信: max-poll-records を使用して、最大500件のメッセージを一度に受信。

  • 個別処理: split を利用して、受信したバッチ内の各メッセージを1件ずつ処理。

  • 手動コミット: KafkaManualCommit を使用して、すべてのメッセージ処理が完了した後にオフセットを手動でコミット。

利点

  • 確実なメッセージ処理: 各メッセージを確実に処理してからオフセットをコミットすることで、メッセージの損失や重複処理を防止。

  • 柔軟なエラーハンドリング: 処理中のエラーに対して柔軟に対応可能。

  • 拡張性: JSON パース、フィルタリング、外部システムとの連携など、さらなる処理の追加が容易。

この構成は、信頼性の高いメッセージ処理が求められるシステムにおいて、非常に有効です。ぜひ、実際のプロジェクトで活用してみてください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?