はじめに
Apache Camel を使って Kafka にメッセージを送信する際、
例外や送信結果(RecordMetadata)の null などに対して適切にリトライ処理を実装することは、信頼性の高いシステム構築に欠かせません。
本記事では、以下のような構成を実現します:
やりたいこと
- Kafka 送信時に
KafkaException
などの例外が発生したらキャッチしてリトライ - Kafka 送信成功時でも
RecordMetadata
がnull
または空の場合はリトライ - 例外は自前の
RetryableKafkaException
を使って明確に制御 - XML DSL + Spring Boot + Apache Camel で構成
使用技術
- Apache Camel 4.x
- Spring Boot 3.x
- Kafka
- XML DSL
ディレクトリ構成(概要)
src/
├─ main/
│ ├─ java/com/example/
│ │ ├─ KafkaSendApplication.java
│ │ ├─ exception/RetryableKafkaException.java
│ │ └─ processor/KafkaToRetryableExceptionMapper.java
│ ├─ resources/
│ │ ├─ application.properties
│ │ └─ camel-context.xml
pom.xml
第1章:application.properties(基本設定)
camel.component.kafka.brokers=localhost:9092
retry.max=3
retry.delay.ms=2000
第2章:pom.xml(Maven依存関係)
Apache Camel と Kafka を連携させるには、以下の依存関係を pom.xml
に追加します。
<dependencies>
<!-- Apache Camel のコアライブラリ -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>4.8.0</version>
</dependency>
<!-- Kafka コンポーネント -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>4.8.0</version>
</dependency>
<!-- Spring Boot との連携用 -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
<version>4.8.0</version>
</dependency>
</dependencies>
第3章:RetryableKafkaException(自前例外クラス)
Kafka送信時の失敗や、送信後に RecordMetadata
が存在しない(=隠れた失敗)ケースを検出した際に、
明示的にリトライ対象として扱うための 自前の例外クラス を作成します。
このクラスを使って例外の意味を明確にし、Camel の onException
によるリトライ制御を容易にします。
RetryableKafkaException.java
package com.example.exception;
/**
* Kafka送信失敗時に再送を試みるための明示的な例外クラス。
*/
public class RetryableKafkaException extends RuntimeException {
public RetryableKafkaException(String message) {
super(message);
}
public RetryableKafkaException(String message, Throwable cause) {
super(message, cause);
}
}
第4章:KafkaToRetryableExceptionMapper(例外変換プロセッサ)
Kafkaへのメッセージ送信時に KafkaException
が発生した場合、
それをそのまま扱うのではなく、自前の RetryableKafkaException
にラップして再送出することで、
後段の onException
でリトライ制御を統一的に扱えるようにします。
そのためのプロセッサがこの KafkaToRetryableExceptionMapper
です。
KafkaToRetryableExceptionMapper.java
package com.example.processor;
import com.example.exception.RetryableKafkaException;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
/**
* Kafka送信時のKafkaExceptionを、RetryableKafkaExceptionに変換するProcessor。
*/
public class KafkaToRetryableExceptionMapper implements Processor {
@Override
public void process(Exchange exchange) {
// Kafka送信時に発生した例外を取得
Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
// 自前のリトライ対象例外に変換して送出
throw new RetryableKafkaException("Kafka send failed", cause);
}
}
第5章:camel-context.xml(ルート定義)
Kafka送信処理の中心となる Camel のルート定義を XML DSL で記述します。
ここでは以下のポイントを重視しています:
- Kafka送信中に
KafkaException
が発生したら自前のリトライ可例外に変換 - Kafka送信成功後でも
RecordMetadata
が null または空なら明示的に失敗扱い - それらの失敗はすべて
RetryableKafkaException
として統一的に扱い、再送処理へ
camel-context.xml
<routes xmlns="http://camel.apache.org/schema/spring">
<!-- RetryableKafkaException をリトライ対象に定義(最大3回、2秒間隔) -->
<onException>
<exception>com.example.exception.RetryableKafkaException</exception>
<redeliveryPolicy maximumRedeliveries="{{retry.max}}" redeliveryDelay="{{retry.delay.ms}}" />
<log message="RetryableKafkaException caught: ${exception.message}" loggingLevel="WARN" />
<handled>
<constant>true</constant>
</handled>
</onException>
<!-- Kafka送信ルート -->
<route id="kafka-send-route">
<from uri="direct:kafkaSend" />
<log message="Sending to Kafka: ${body}" />
<!-- Kafka送信中にKafkaExceptionが発生したら、自前例外に変換 -->
<onException>
<exception>org.apache.camel.component.kafka.KafkaException</exception>
<handled><constant>true</constant></handled>
<process>
<bean class="com.example.processor.KafkaToRetryableExceptionMapper"/>
</process>
</onException>
<!-- Kafka送信 -->
<to uri="kafka:my-topic" />
<!-- Kafkaの送信結果を検証:RecordMetadata が無いなら失敗扱い -->
<choice>
<when>
<or>
<simple>${header.CamelKafkaRecordMeta} == null</simple>
<simple>${header.CamelKafkaRecordMeta} == ''</simple>
</or>
<throwException exceptionType="com.example.exception.RetryableKafkaException">
<message>Kafka RecordMetadata is missing — send failed</message>
</throwException>
</when>
<otherwise>
<log message="Kafka send succeeded. Metadata: ${header.CamelKafkaRecordMeta}" />
</otherwise>
</choice>
</route>
<!-- 起動時に1回だけメッセージを送信するルート -->
<route id="test-trigger">
<from uri="timer:test?repeatCount=1" />
<setBody>
<constant>{"event":"test"}</constant>
</setBody>
<to uri="direct:kafkaSend" />
</route>
</routes>
第6章:KafkaSendApplication(起動クラス)
このアプリケーションは Spring Boot ベースで構成されており、
main()
メソッドから Camel のルートが自動起動されます。
KafkaSendApplication.java
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Kafka送信アプリケーションのエントリーポイント。
* Camel のルート(camel-context.xml)が自動で読み込まれます。
*/
@SpringBootApplication
public class KafkaSendApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaSendApplication.class, args);
}
}
第7章:動作結果(ログ)
この構成でアプリケーションを起動すると、Camel ルートが1回だけ実行され、Kafka にテストメッセージが送信されます。
正常に送信された場合、以下のようなログが出力されます:
Sending to Kafka: {“event”:“test”}
Kafka send succeeded. Metadata: KafkaProducerRecordMetadata(topic=‘my-topic’, partition=0, offset=123)
送信失敗時(例:Kafka ブローカーが起動していない場合)
Sending to Kafka: {“event”:“test”}
RetryableKafkaException caught: Kafka send failed
RetryableKafkaException caught: Kafka send failed
RetryableKafkaException caught: Kafka send failed
上記のように
onException
によって最大3回まで自動でリトライされます(retry.max=3
の設定)。
メタ情報なし(RecordMetadata が null or 空)の場合
Sending to Kafka: {“event”:“test”}
RetryableKafkaException caught: Kafka RecordMetadata is missing — send failed
このように Camel のルート上で Kafka 送信処理を安全にハンドリングできていることが確認できます。