0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Apache Camel で Kafka 送信失敗をリトライする(例外検知 + メタ情報チェック付き)【Camel 4 / XML DSL】

Last updated at Posted at 2025-04-16

はじめに

Apache Camel を使って Kafka にメッセージを送信する際、
例外や送信結果(RecordMetadata)の null などに対して適切にリトライ処理を実装することは、信頼性の高いシステム構築に欠かせません。

本記事では、以下のような構成を実現します:

やりたいこと

  • Kafka 送信時に KafkaException などの例外が発生したらキャッチしてリトライ
  • Kafka 送信成功時でも RecordMetadatanull または空の場合はリトライ
  • 例外は自前の RetryableKafkaException を使って明確に制御
  • XML DSL + Spring Boot + Apache Camel で構成

使用技術

  • Apache Camel 4.x
  • Spring Boot 3.x
  • Kafka
  • XML DSL

ディレクトリ構成(概要)

src/
├─ main/
│ ├─ java/com/example/
│ │ ├─ KafkaSendApplication.java
│ │ ├─ exception/RetryableKafkaException.java
│ │ └─ processor/KafkaToRetryableExceptionMapper.java
│ ├─ resources/
│ │ ├─ application.properties
│ │ └─ camel-context.xml
pom.xml


第1章:application.properties(基本設定)

camel.component.kafka.brokers=localhost:9092
retry.max=3
retry.delay.ms=2000

第2章:pom.xml(Maven依存関係)

Apache Camel と Kafka を連携させるには、以下の依存関係を pom.xml に追加します。

<dependencies>
    <!-- Apache Camel のコアライブラリ -->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-core</artifactId>
        <version>4.8.0</version>
    </dependency>

    <!-- Kafka コンポーネント -->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-kafka</artifactId>
        <version>4.8.0</version>
    </dependency>

    <!-- Spring Boot との連携用 -->
    <dependency>
        <groupId>org.apache.camel.springboot</groupId>
        <artifactId>camel-spring-boot-starter</artifactId>
        <version>4.8.0</version>
    </dependency>
</dependencies>

第3章:RetryableKafkaException(自前例外クラス)

Kafka送信時の失敗や、送信後に RecordMetadata が存在しない(=隠れた失敗)ケースを検出した際に、
明示的にリトライ対象として扱うための 自前の例外クラス を作成します。

このクラスを使って例外の意味を明確にし、Camel の onException によるリトライ制御を容易にします。

RetryableKafkaException.java

package com.example.exception;

/**
 * Kafka送信失敗時に再送を試みるための明示的な例外クラス。
 */
public class RetryableKafkaException extends RuntimeException {

    public RetryableKafkaException(String message) {
        super(message);
    }

    public RetryableKafkaException(String message, Throwable cause) {
        super(message, cause);
    }
}

第4章:KafkaToRetryableExceptionMapper(例外変換プロセッサ)

Kafkaへのメッセージ送信時に KafkaException が発生した場合、
それをそのまま扱うのではなく、自前の RetryableKafkaException にラップして再送出することで、
後段の onException でリトライ制御を統一的に扱えるようにします。

そのためのプロセッサがこの KafkaToRetryableExceptionMapper です。

KafkaToRetryableExceptionMapper.java

package com.example.processor;

import com.example.exception.RetryableKafkaException;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

/**
 * Kafka送信時のKafkaExceptionを、RetryableKafkaExceptionに変換するProcessor。
 */
public class KafkaToRetryableExceptionMapper implements Processor {

    @Override
    public void process(Exchange exchange) {
        // Kafka送信時に発生した例外を取得
        Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);

        // 自前のリトライ対象例外に変換して送出
        throw new RetryableKafkaException("Kafka send failed", cause);
    }
}

第5章:camel-context.xml(ルート定義)

Kafka送信処理の中心となる Camel のルート定義を XML DSL で記述します。
ここでは以下のポイントを重視しています:

  • Kafka送信中に KafkaException が発生したら自前のリトライ可例外に変換
  • Kafka送信成功後でも RecordMetadata が null または空なら明示的に失敗扱い
  • それらの失敗はすべて RetryableKafkaException として統一的に扱い、再送処理へ

camel-context.xml

<routes xmlns="http://camel.apache.org/schema/spring">

    <!-- RetryableKafkaException をリトライ対象に定義(最大3回、2秒間隔) -->
    <onException>
        <exception>com.example.exception.RetryableKafkaException</exception>
        <redeliveryPolicy maximumRedeliveries="{{retry.max}}" redeliveryDelay="{{retry.delay.ms}}" />
        <log message="RetryableKafkaException caught: ${exception.message}" loggingLevel="WARN" />
        <handled>
            <constant>true</constant>
        </handled>
    </onException>

    <!-- Kafka送信ルート -->
    <route id="kafka-send-route">
        <from uri="direct:kafkaSend" />
        <log message="Sending to Kafka: ${body}" />

        <!-- Kafka送信中にKafkaExceptionが発生したら、自前例外に変換 -->
        <onException>
            <exception>org.apache.camel.component.kafka.KafkaException</exception>
            <handled><constant>true</constant></handled>
            <process>
                <bean class="com.example.processor.KafkaToRetryableExceptionMapper"/>
            </process>
        </onException>

        <!-- Kafka送信 -->
        <to uri="kafka:my-topic" />

        <!-- Kafkaの送信結果を検証:RecordMetadata が無いなら失敗扱い -->
        <choice>
            <when>
                <or>
                    <simple>${header.CamelKafkaRecordMeta} == null</simple>
                    <simple>${header.CamelKafkaRecordMeta} == ''</simple>
                </or>
                <throwException exceptionType="com.example.exception.RetryableKafkaException">
                    <message>Kafka RecordMetadata is missing — send failed</message>
                </throwException>
            </when>
            <otherwise>
                <log message="Kafka send succeeded. Metadata: ${header.CamelKafkaRecordMeta}" />
            </otherwise>
        </choice>
    </route>

    <!-- 起動時に1回だけメッセージを送信するルート -->
    <route id="test-trigger">
        <from uri="timer:test?repeatCount=1" />
        <setBody>
            <constant>{"event":"test"}</constant>
        </setBody>
        <to uri="direct:kafkaSend" />
    </route>

</routes>

第6章:KafkaSendApplication(起動クラス)

このアプリケーションは Spring Boot ベースで構成されており、
main() メソッドから Camel のルートが自動起動されます。

KafkaSendApplication.java

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * Kafka送信アプリケーションのエントリーポイント。
 * Camel のルート(camel-context.xml)が自動で読み込まれます。
 */
@SpringBootApplication
public class KafkaSendApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaSendApplication.class, args);
    }
}

第7章:動作結果(ログ)

この構成でアプリケーションを起動すると、Camel ルートが1回だけ実行され、Kafka にテストメッセージが送信されます。

正常に送信された場合、以下のようなログが出力されます:

Sending to Kafka: {“event”:“test”}
Kafka send succeeded. Metadata: KafkaProducerRecordMetadata(topic=‘my-topic’, partition=0, offset=123)

送信失敗時(例:Kafka ブローカーが起動していない場合)

Sending to Kafka: {“event”:“test”}
RetryableKafkaException caught: Kafka send failed
RetryableKafkaException caught: Kafka send failed
RetryableKafkaException caught: Kafka send failed

上記のように onException によって最大3回まで自動でリトライされます(retry.max=3 の設定)。

メタ情報なし(RecordMetadata が null or 空)の場合

Sending to Kafka: {“event”:“test”}
RetryableKafkaException caught: Kafka RecordMetadata is missing — send failed

このように Camel のルート上で Kafka 送信処理を安全にハンドリングできていることが確認できます。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?