0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Axon FrameworkでKafkaの複数トピックからイベントを受信する [Kotlin,SpringBoot]

Posted at

はじめに

Axon Framework(以降、Axonと記述)を使用してKafkaからイベントを受信する実装を試した過程について、以下の記事で記載しました。

その際は、下記のようにdefault-topicとして指定した1つのKafkaトピックからイベントを受信できる状態でした。

src/main/resources/application.yml
axon:
  axonserver:
    enabled: false
  serializer:
    general: jackson
    events: jackson
  kafka:
    bootstrap-servers: localhost:9094
    consumer:
      event-processor-mode: tracking
    default-topic: doc-topic # doc-topicのみから受信

本記事では、1つのConsumerアプリケーションが複数のKafkaトピックからイベントを受信できるようにする場合の設定について記載します。

設定の追加

ここでは、doc-topicdefault-topicの2つのトピックから受信できるようにしたいものとします。

ドキュメントの以下あたりを参考に、Configクラスを作成してみます。

ドキュメントに記載されたサンプルコードそのままでなく、あえて最小限の記述で試してみます。

src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class AxonKafkaConsumerConfig {
    private val topics: List<String> = listOf("doc-topic", "default-topic")

    @Bean
    fun streamableKafkaMessageSource: StreamableKafkaMessageSource<String, ByteArray> =
        StreamableKafkaMessageSource.builder<String, ByteArray>()
            .topics(topics)
            .build()
}

また、default-topicトピックから受信するOtherCreatedイベント用のEventHandlerを追加しておきます。

src/main/kotlin/org/example/axonkafkatrial/consumer/listener/ConsumerProcess.kt
package org.example.axonkafkatrial.consumer.listener

import org.axonframework.eventhandling.EventHandler
import org.example.axonkafkatrial.shared.event.DocCreated
import org.example.axonkafkatrial.shared.event.OtherCreated
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component

@Component
class ConsumerProcess {
    private val logger = LoggerFactory.getLogger(this.javaClass)

    @EventHandler
    fun on(event: DocCreated) {
        logger.info("event received: $event")
    }

    // 追加
    @EventHandler
    fun on(event: OtherCreated) {
        logger.info("event received: $event")
    }
}

この状態でアプリ起動すると、以下のエラーが発生して起動に失敗しました。

INFO 83833 --- [           main] .s.b.a.l.ConditionEvaluationReportLogger : 

Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled.
2024-11-10T13:49:36.233+09:00 ERROR 83833 --- [           main] o.s.boot.SpringApplication               : Application run failed

org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'axonKafkaTrackingConfig': Unsatisfied dependency expressed through method 'configure' parameter 1: Error creating bean with name 'streamableKafkaMessageSource' defined in class path resource [org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.class]: Failed to instantiate [org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource]: Factory method 'streamableKafkaMessageSource' threw exception with message: The ConsumerFactory is a hard requirement and should be provided
	at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement.resolveMethodArguments(AutowiredAnnotationBeanPostProcessor.java:896) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement.inject(AutowiredAnnotationBeanPostProcessor.java:849) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:145) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessProperties(AutowiredAnnotationBeanPostProcessor.java:509) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1439) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:599) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:975) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:971) ~[spring-context-6.1.14.jar:6.1.14]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:625) ~[spring-context-6.1.14.jar:6.1.14]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:456) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:335) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1363) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1352) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.example.axonkafkatrial.consumer.ConsumerApplicationKt.main(ConsumerApplication.kt:12) ~[main/:na]
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'streamableKafkaMessageSource' defined in class path resource [org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.class]: Failed to instantiate [org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource]: Factory method 'streamableKafkaMessageSource' threw exception with message: The ConsumerFactory is a hard requirement and should be provided
	at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:648) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:485) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1355) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1185) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1443) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1353) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement.resolveMethodArguments(AutowiredAnnotationBeanPostProcessor.java:888) ~[spring-beans-6.1.14.jar:6.1.14]
	... 20 common frames omitted
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource]: Factory method 'streamableKafkaMessageSource' threw exception with message: The ConsumerFactory is a hard requirement and should be provided
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:178) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:644) ~[spring-beans-6.1.14.jar:6.1.14]
	... 33 common frames omitted
Caused by: org.axonframework.common.AxonConfigurationException: The ConsumerFactory is a hard requirement and should be provided
	at org.axonframework.common.BuilderUtils.lambda$assertThat$0(BuilderUtils.java:48) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.common.Assert.assertThat(Assert.java:115) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.common.BuilderUtils.assertThat(BuilderUtils.java:48) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.common.BuilderUtils.assertNonNull(BuilderUtils.java:62) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource$Builder.validate(StreamableKafkaMessageSource.java:313) ~[axon-kafka-4.10.0.jar:4.10.0]
	at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource.<init>(StreamableKafkaMessageSource.java:86) ~[axon-kafka-4.10.0.jar:4.10.0]
	at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource$Builder.build(StreamableKafkaMessageSource.java:302) ~[axon-kafka-4.10.0.jar:4.10.0]
	at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig.streamableKafkaMessageSource(AxonKafkaConsumerConfig.kt:29) ~[main/:na]
	at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig$$SpringCGLIB$$0.CGLIB$streamableKafkaMessageSource$0(<generated>) ~[main/:na]
	at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig$$SpringCGLIB$$FastClass$$1.invoke(<generated>) ~[main/:na]
	at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:258) ~[spring-core-6.1.14.jar:6.1.14]
	at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:348) ~[spring-context-6.1.14.jar:6.1.14]
	at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig$$SpringCGLIB$$0.streamableKafkaMessageSource(<generated>) ~[main/:na]
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:146) ~[spring-beans-6.1.14.jar:6.1.14]
	... 34 common frames omitted

Process finished with exit code 1

ログに以下が出ているとおり、ConsumerFactoryの指定は必須のようです。

The ConsumerFactory is a hard requirement and should be provided

また、Fetcherも同様に必須のようです。

ドキュメントをよく見ると、SubscribableKafkaMessageSourceを使う場合のサンプルコードには、 ConsumerFactoryFetcherが「Hard Requirement」として記載されていました。

いま試しているのは StreamableKafkaMessageSourceの方で、そちらのサンプルコードには「Hard Requirement」というコメントはありませんが、同じなのでしょう。

    public SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource(List<String> topics,
                                                                                         String groupId,
                                                                                         ConsumerFactory<String, byte[]> consumerFactory,
                                                                                         Fetcher<String, byte[], EventMessage<?>> fetcher,
                                                                                         KafkaMessageConverter<String, byte[]> messageConverter,
                                                                                         int consumerCount,
                                                                                         KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {
        SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<String, byte[]>builder()
                .topics(topics)                     // Defaults to a collection of "Axon.Events"
                .groupId(groupId)                   // Hard requirement
                .consumerFactory(consumerFactory)   // Hard requirement
                .fetcher(fetcher)                   // Hard requirement
                .messageConverter(messageConverter) // Defaults to a "DefaultKafkaMessageConverter"
                .consumerCount(consumerCount)       // Defaults to a single Consumer
                .build();
        // Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
        kafkaMessageSourceConfigurer.registerSubscribableSource(configuration -> subscribableKafkaMessageSource);
        return subscribableKafkaMessageSource;

そこで、以下のように修正しました。

src/main/kotlin/org/example/axonkafkatrial/consumer/listener/ConsumerProcess.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class AxonKafkaConsumerConfig {
    private val topics: List<String> = listOf("doc-topic", "default-topic")

    @Bean
    fun streamableKafkaMessageSource(
        consumerFactory: ConsumerFactory<String, ByteArray>, // 追加
        fetcher: Fetcher<String, ByteArray, KafkaEventMessage>, // 追加
    ): StreamableKafkaMessageSource<String, ByteArray> =
        StreamableKafkaMessageSource.builder<String, ByteArray>()
            .topics(topics)
            .consumerFactory(consumerFactory) // 追加
            .fetcher(fetcher) // 追加
            .build()
}

これでアプリは起動するようになりましたが、ログを見る限り、Serializerがapplication.ymlで指定したJacksonSerializerでなくXStreamSerializerになっており、イベント受信処理も期待通り動作しませんでした。

2024-11-10T13:57:16.526+09:00  WARN 84623 --- [           main] a.e.k.e.c.s.StreamableKafkaMessageSource : The default XStreamSerializer is used, whereas it is strongly recommended to configure the security context of the XStream instance.

org.axonframework.common.AxonConfigurationException: A default XStreamSerializer is used, without specifying the security context
	at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource$Builder.validate(StreamableKafkaMessageSource.java:316) ~[axon-kafka-4.10.0.jar:4.10.0]
	at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource.<init>(StreamableKafkaMessageSource.java:86) ~[axon-kafka-4.10.0.jar:4.10.0]
	at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource$Builder.build(StreamableKafkaMessageSource.java:302) ~[axon-kafka-4.10.0.jar:4.10.0]
	at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig.streamableKafkaMessageSource(AxonKafkaConsumerConfig.kt:29) ~[main/:na]
	at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig$$SpringCGLIB$$0.CGLIB$streamableKafkaMessageSource$0(<generated>) ~[main/:na]
	at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig$$SpringCGLIB$$FastClass$$1.invoke(<generated>) ~[main/:na]
	at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:258) ~[spring-core-6.1.14.jar:6.1.14]
	at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:348) ~[spring-context-6.1.14.jar:6.1.14]
	at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig$$SpringCGLIB$$0.streamableKafkaMessageSource(<generated>) ~[main/:na]
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:146) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:644) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:636) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1355) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1185) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1443) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1353) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement.resolveMethodArguments(AutowiredAnnotationBeanPostProcessor.java:888) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement.inject(AutowiredAnnotationBeanPostProcessor.java:849) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:145) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessProperties(AutowiredAnnotationBeanPostProcessor.java:509) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1439) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:599) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:975) ~[spring-beans-6.1.14.jar:6.1.14]
	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:971) ~[spring-context-6.1.14.jar:6.1.14]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:625) ~[spring-context-6.1.14.jar:6.1.14]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:456) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:335) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1363) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1352) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.example.axonkafkatrial.consumer.ConsumerApplicationKt.main(ConsumerApplication.kt:12) ~[main/:na]

Configクラスで設定する場合、Serializerも明示的に指定しないと、application.yml側で設定したSerializerが使用されないようです。

以下のように修正しました。

src/main/kotlin/org/example/axonkafkatrial/consumer/listener/ConsumerProcess.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.axonframework.serialization.Serializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class AxonKafkaConsumerConfig {
    private val topics: List<String> = listOf("doc-topic", "default-topic")

    @Bean
    fun streamableKafkaMessageSource(
        consumerFactory: ConsumerFactory<String, ByteArray>,
        fetcher: Fetcher<String, ByteArray, KafkaEventMessage>,
        serializer: Serializer, // 追加
    ): StreamableKafkaMessageSource<String, ByteArray> =
        StreamableKafkaMessageSource.builder<String, ByteArray>()
            .topics(topics)
            .consumerFactory(consumerFactory)
            .fetcher(fetcher)
            .serializer(serializer) // 追加
            .build()
}

これで、アプリ起動に成功し、 doc-topicdefault-topic の両方からイベント受信して処理できていることが確認できました。

INFO 85549 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=4971f167-7845-40ff-905e-43b8583c9082, body=test)
INFO 85549 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: OtherCreated(id=256ee3b7-1254-4949-950b-719656602ffc, body=test)

token_entry テーブルの token カラムには、以下のようにトピックごとのオフセット状況が出力されるようになりました。

{"positions":{"doc-topic-0":1,"default-topic-0":1}}

おまけ

トピックのリストは、例えば以下のようにすれば、application.yml経由で設定することも可能です。

src/main/kotlin/org/example/axonkafkatrial/consumer/listener/ConsumerProcess.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.axonframework.serialization.Serializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class AxonKafkaConsumerConfig {
    @Value("\${application.kafka.topics}")
    private val topics: List<String>? = null

    @Bean
    fun streamableKafkaMessageSource(
        consumerFactory: ConsumerFactory<String, ByteArray>,
        fetcher: Fetcher<String, ByteArray, KafkaEventMessage>,
        serializer: Serializer,
    ): StreamableKafkaMessageSource<String, ByteArray> =
        StreamableKafkaMessageSource.builder<String, ByteArray>()
            .topics(topics)
            .consumerFactory(consumerFactory)
            .fetcher(fetcher)
            .serializer(serializer)
            .build()
}
src/main/resources/application.yml

# 以下を追加
application:
  kafka:
    topics: doc-topic, default-topic

まとめ

AxonでKafkaの複数トピックからイベントを受信するための設定方法を確認しました。

Configクラスで設定を記述する場合、特にapplication.ymlによる設定と組み合わせて使用するときは、適切な項目を引数で取得して設定するようBeanを記述する必要があることがわかりました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?