LoginSignup
0
0

More than 3 years have passed since last update.

spring-kafkaのconsumerでjson deserialize型変換エラーをスキップ

Posted at

spring-kafkaでjsonの型変換などの処理続行不可能なメッセージをskipしたい場合がある。以下はその場合に使えるかもしれない設定例。

build.gradle
plugins {
  id 'org.springframework.boot' version '2.4.2'
  id 'io.spring.dependency-management' version '1.0.11.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '15'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter'
  implementation 'org.springframework.kafka:spring-kafka'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'org.springframework.kafka:spring-kafka-test'
  implementation 'com.fasterxml.jackson.core:jackson-databind'
}
test {
  useJUnitPlatform()
}

まずプロパティでconsumerのdeserializerにorg.springframework.kafka.support.serializer.ErrorHandlingDeserializerを設定し、そのdelete先をspring.kafka.properties.spring.deserializer.value.delegate.classプロパティで指定する。これはjson変換クラスを指定する。こうするとjson変換エラーは単にスキップされる。

application.properties
spring.kafka.bootstrap-servers=localhost:19092
spring.kafka.consumer.group-id=myGroup

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

以下は動作検証用の適当なconsumer.

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaSampleApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaSampleApplication.class, args);
    }
}

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SampleData {
    String id;
    int value;
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class JsonConsumer {
    @KafkaListener(topics = "mytopic")
    public void processMessage(SampleData content) {
        System.out.println("content" + content);
    }
}

とはいえ、実際にはログ出したいとかエラー発生時に特殊な処理したいとか色々あるので、もうちょい細かい設定が色々必要になる。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0