はじめに
Apache Camel を使って Kafka メッセージをバッチで受信し、
1件ずつ処理してから最後に手動でオフセットをコミットする実装方法を紹介します。
- Apache Camel 4.x(オープンソース版)
- Spring Boot 3.x
- XML DSL
- camel-kafka
- 手動コミット(manual commit)
やりたいこと
- Kafka から 最大 500 件のバッチ受信
-
split
で 1 件ずつ処理(標準出力へログ出力) - 全件処理完了後に手動で Kafka offset を commit
プロジェクト構成
- Spring Boot + Camel(XML DSL)
- Kafka (localhost:9092)
- Kafka 設定は
application.properties
に集中管理
1. application.properties
Kafka 関連の設定はすべて application.properties
に定義しておくことで、Camel の XML DSL 側はスッキリ書けます。
camel.component.kafka.brokers=localhost:9092
camel.component.kafka.group-id=my-group
camel.component.kafka.auto-offset-reset=latest
camel.component.kafka.max-poll-records=500
camel.component.kafka.consumer-request-timeout-ms=3000
camel.component.kafka.poll-timeout-ms=3000
camel.component.kafka.auto-commit-enable=false
camel.component.kafka.allow-manual-commit=true
2. camel-context.xml
Kafka からバッチでメッセージを受信し、split
で 1 件ずつ処理して、
最後に manualCommitProcessor
でオフセットを手動コミットします。
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="kafka-batch-consume-route">
<!-- Kafkaトピックからメッセージを受信 -->
<from uri="kafka:my-topic" />
<!-- メッセージのバッチを分割して個別に処理 -->
<split streaming="true">
<simple>${body}</simple>
<process ref="messageProcessor"/>
</split>
<!-- 全メッセージ処理後に手動でオフセットをコミット -->
<process ref="manualCommitProcessor"/>
</route>
</routes>
3. MessageProcessor.java
Kafka から受信したバッチメッセージを split
EIP によって分割し、各メッセージを個別に処理するための Processor
実装です。
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.springframework.stereotype.Component;
@Component("messageProcessor")
public class MessageProcessor implements Processor {
@Override
public void process(Exchange exchange) {
// 各メッセージの本体を取得
Object body = exchange.getMessage().getBody();
// メッセージの処理をここに実装
System.out.println("Processing message: " + body);
}
}
4. ManualCommitProcessor.java
KafkaManualCommit
を使って Kafka の offset を明示的に commit する Processor
を実装します。
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaManualCommit;
import org.springframework.stereotype.Component;
@Component("manualCommitProcessor")
public class ManualCommitProcessor implements Processor {
@Override
public void process(Exchange exchange) {
KafkaManualCommit manualCommit = exchange.getMessage().getHeader(
KafkaManualCommit.MANUAL_COMMIT, KafkaManualCommit.class);
if (manualCommit != null) {
manualCommit.commit();
System.out.println("Kafka offset committed manually.");
} else {
System.out.println("ManualCommit not available.");
}
}
}
5. pom.xml
の依存関係(Apache Camel OSS)
Apache Camel を使用する場合の Maven 依存関係は以下の通りです。
<dependencies>
<!-- Apache Camel Core + Spring Boot Starter -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
<version>4.4.0</version> <!-- 最新の安定バージョンを指定 -->
</dependency>
<!-- Apache Camel Kafka コンポーネント -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-kafka-starter</artifactId>
<version>4.4.0</version>
</dependency>
</dependencies>
6. 起動クラス(KafkaBatchApp.java
)
Spring Boot アプリケーションのエントリーポイントとなるクラスです。
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaBatchApp {
public static void main(String[] args) {
SpringApplication.run(KafkaBatchApp.class, args);
}
}
7. 実行結果(ログ出力例)
Kafka に複数件のメッセージを送信すると、以下のようなログが出力されます:
Received message: {“event”:“test1”}
Received message: {“event”:“test2”}
…
Kafka offset committed manually.
解説
-
Received message: ...
各メッセージがsplit
によって個別に処理され、ログに出力されています。 -
Kafka offset committed manually.
全てのメッセージ処理が完了した後、manualCommitProcessor
によって Kafka のオフセットが手動でコミットされたことを示しています。
このように、Camel のルートは Kafka からのバッチメッセージを効率的に処理し、確実にオフセットを管理することができます。
8. まとめ
本記事では、Apache Camel を使用して、Kafka からのメッセージをバッチで受信し、各メッセージを個別に処理した後、手動でオフセットをコミットする方法を紹介しました。
主なポイント
-
バッチ受信:
max-poll-records
を使用して、最大500件のメッセージを一度に受信。 -
個別処理:
split
を利用して、受信したバッチ内の各メッセージを1件ずつ処理。 -
手動コミット:
KafkaManualCommit
を使用して、すべてのメッセージ処理が完了した後にオフセットを手動でコミット。
利点
-
確実なメッセージ処理: 各メッセージを確実に処理してからオフセットをコミットすることで、メッセージの損失や重複処理を防止。
-
柔軟なエラーハンドリング: 処理中のエラーに対して柔軟に対応可能。
-
拡張性: JSON パース、フィルタリング、外部システムとの連携など、さらなる処理の追加が容易。
この構成は、信頼性の高いメッセージ処理が求められるシステムにおいて、非常に有効です。ぜひ、実際のプロジェクトで活用してみてください。