spring-kafkaでjsonの型変換などの処理続行不可能なメッセージをskipしたい場合がある。以下はその場合に使えるかもしれない設定例。
build.gradle
plugins {
id 'org.springframework.boot' version '2.4.2'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '15'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
implementation 'com.fasterxml.jackson.core:jackson-databind'
}
test {
useJUnitPlatform()
}
まずプロパティでconsumerのdeserializerにorg.springframework.kafka.support.serializer.ErrorHandlingDeserializer
を設定し、そのdelete先をspring.kafka.properties.spring.deserializer.value.delegate.class
プロパティで指定する。これはjson変換クラスを指定する。こうするとjson変換エラーは単にスキップされる。
application.properties
spring.kafka.bootstrap-servers=localhost:19092
spring.kafka.consumer.group-id=myGroup
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
以下は動作検証用の適当なconsumer.
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaSampleApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaSampleApplication.class, args);
}
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SampleData {
String id;
int value;
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class JsonConsumer {
@KafkaListener(topics = "mytopic")
public void processMessage(SampleData content) {
System.out.println("content" + content);
}
}
とはいえ、実際にはログ出したいとかエラー発生時に特殊な処理したいとか色々あるので、もうちょい細かい設定が色々必要になる。