はじめに
Axon Framework(以降、Axonと記述)を使用してKafkaからイベントを受信する実装を試した過程について、以下の記事で記載しました。
その際は、下記のようにdefault-topic
として指定した1つのKafkaトピックからイベントを受信できる状態でした。
axon:
axonserver:
enabled: false
serializer:
general: jackson
events: jackson
kafka:
bootstrap-servers: localhost:9094
consumer:
event-processor-mode: tracking
default-topic: doc-topic # doc-topicのみから受信
本記事では、1つのConsumerアプリケーションが複数のKafkaトピックからイベントを受信できるようにする場合の設定について記載します。
設定の追加
ここでは、doc-topic
とdefault-topic
の2つのトピックから受信できるようにしたいものとします。
ドキュメントの以下あたりを参考に、Configクラスを作成してみます。
ドキュメントに記載されたサンプルコードそのままでなく、あえて最小限の記述で試してみます。
package org.example.axonkafkatrial.consumer.config
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class AxonKafkaConsumerConfig {
private val topics: List<String> = listOf("doc-topic", "default-topic")
@Bean
fun streamableKafkaMessageSource: StreamableKafkaMessageSource<String, ByteArray> =
StreamableKafkaMessageSource.builder<String, ByteArray>()
.topics(topics)
.build()
}
また、default-topic
トピックから受信するOtherCreated
イベント用のEventHandlerを追加しておきます。
package org.example.axonkafkatrial.consumer.listener
import org.axonframework.eventhandling.EventHandler
import org.example.axonkafkatrial.shared.event.DocCreated
import org.example.axonkafkatrial.shared.event.OtherCreated
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
@Component
class ConsumerProcess {
private val logger = LoggerFactory.getLogger(this.javaClass)
@EventHandler
fun on(event: DocCreated) {
logger.info("event received: $event")
}
// 追加
@EventHandler
fun on(event: OtherCreated) {
logger.info("event received: $event")
}
}
この状態でアプリ起動すると、以下のエラーが発生して起動に失敗しました。
INFO 83833 --- [ main] .s.b.a.l.ConditionEvaluationReportLogger :
Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled.
2024-11-10T13:49:36.233+09:00 ERROR 83833 --- [ main] o.s.boot.SpringApplication : Application run failed
org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'axonKafkaTrackingConfig': Unsatisfied dependency expressed through method 'configure' parameter 1: Error creating bean with name 'streamableKafkaMessageSource' defined in class path resource [org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.class]: Failed to instantiate [org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource]: Factory method 'streamableKafkaMessageSource' threw exception with message: The ConsumerFactory is a hard requirement and should be provided
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement.resolveMethodArguments(AutowiredAnnotationBeanPostProcessor.java:896) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement.inject(AutowiredAnnotationBeanPostProcessor.java:849) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:145) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessProperties(AutowiredAnnotationBeanPostProcessor.java:509) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1439) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:599) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:975) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:971) ~[spring-context-6.1.14.jar:6.1.14]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:625) ~[spring-context-6.1.14.jar:6.1.14]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.3.5.jar:3.3.5]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-3.3.5.jar:3.3.5]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:456) ~[spring-boot-3.3.5.jar:3.3.5]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:335) ~[spring-boot-3.3.5.jar:3.3.5]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1363) ~[spring-boot-3.3.5.jar:3.3.5]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1352) ~[spring-boot-3.3.5.jar:3.3.5]
at org.example.axonkafkatrial.consumer.ConsumerApplicationKt.main(ConsumerApplication.kt:12) ~[main/:na]
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'streamableKafkaMessageSource' defined in class path resource [org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.class]: Failed to instantiate [org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource]: Factory method 'streamableKafkaMessageSource' threw exception with message: The ConsumerFactory is a hard requirement and should be provided
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:648) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:485) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1355) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1185) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1443) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1353) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement.resolveMethodArguments(AutowiredAnnotationBeanPostProcessor.java:888) ~[spring-beans-6.1.14.jar:6.1.14]
... 20 common frames omitted
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource]: Factory method 'streamableKafkaMessageSource' threw exception with message: The ConsumerFactory is a hard requirement and should be provided
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:178) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:644) ~[spring-beans-6.1.14.jar:6.1.14]
... 33 common frames omitted
Caused by: org.axonframework.common.AxonConfigurationException: The ConsumerFactory is a hard requirement and should be provided
at org.axonframework.common.BuilderUtils.lambda$assertThat$0(BuilderUtils.java:48) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.common.Assert.assertThat(Assert.java:115) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.common.BuilderUtils.assertThat(BuilderUtils.java:48) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.common.BuilderUtils.assertNonNull(BuilderUtils.java:62) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource$Builder.validate(StreamableKafkaMessageSource.java:313) ~[axon-kafka-4.10.0.jar:4.10.0]
at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource.<init>(StreamableKafkaMessageSource.java:86) ~[axon-kafka-4.10.0.jar:4.10.0]
at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource$Builder.build(StreamableKafkaMessageSource.java:302) ~[axon-kafka-4.10.0.jar:4.10.0]
at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig.streamableKafkaMessageSource(AxonKafkaConsumerConfig.kt:29) ~[main/:na]
at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig$$SpringCGLIB$$0.CGLIB$streamableKafkaMessageSource$0(<generated>) ~[main/:na]
at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig$$SpringCGLIB$$FastClass$$1.invoke(<generated>) ~[main/:na]
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:258) ~[spring-core-6.1.14.jar:6.1.14]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:348) ~[spring-context-6.1.14.jar:6.1.14]
at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig$$SpringCGLIB$$0.streamableKafkaMessageSource(<generated>) ~[main/:na]
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:146) ~[spring-beans-6.1.14.jar:6.1.14]
... 34 common frames omitted
Process finished with exit code 1
ログに以下が出ているとおり、ConsumerFactory
の指定は必須のようです。
The ConsumerFactory is a hard requirement and should be provided
また、Fetcherも同様に必須のようです。
ドキュメントをよく見ると、SubscribableKafkaMessageSource
を使う場合のサンプルコードには、 ConsumerFactory
やFetcher
が「Hard Requirement」として記載されていました。
いま試しているのは StreamableKafkaMessageSource
の方で、そちらのサンプルコードには「Hard Requirement」というコメントはありませんが、同じなのでしょう。
public SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource(List<String> topics,
String groupId,
ConsumerFactory<String, byte[]> consumerFactory,
Fetcher<String, byte[], EventMessage<?>> fetcher,
KafkaMessageConverter<String, byte[]> messageConverter,
int consumerCount,
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {
SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<String, byte[]>builder()
.topics(topics) // Defaults to a collection of "Axon.Events"
.groupId(groupId) // Hard requirement
.consumerFactory(consumerFactory) // Hard requirement
.fetcher(fetcher) // Hard requirement
.messageConverter(messageConverter) // Defaults to a "DefaultKafkaMessageConverter"
.consumerCount(consumerCount) // Defaults to a single Consumer
.build();
// Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
kafkaMessageSourceConfigurer.registerSubscribableSource(configuration -> subscribableKafkaMessageSource);
return subscribableKafkaMessageSource;
そこで、以下のように修正しました。
package org.example.axonkafkatrial.consumer.config
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class AxonKafkaConsumerConfig {
private val topics: List<String> = listOf("doc-topic", "default-topic")
@Bean
fun streamableKafkaMessageSource(
consumerFactory: ConsumerFactory<String, ByteArray>, // 追加
fetcher: Fetcher<String, ByteArray, KafkaEventMessage>, // 追加
): StreamableKafkaMessageSource<String, ByteArray> =
StreamableKafkaMessageSource.builder<String, ByteArray>()
.topics(topics)
.consumerFactory(consumerFactory) // 追加
.fetcher(fetcher) // 追加
.build()
}
これでアプリは起動するようになりましたが、ログを見る限り、Serializerがapplication.ymlで指定したJacksonSerializer
でなくXStreamSerializer
になっており、イベント受信処理も期待通り動作しませんでした。
2024-11-10T13:57:16.526+09:00 WARN 84623 --- [ main] a.e.k.e.c.s.StreamableKafkaMessageSource : The default XStreamSerializer is used, whereas it is strongly recommended to configure the security context of the XStream instance.
org.axonframework.common.AxonConfigurationException: A default XStreamSerializer is used, without specifying the security context
at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource$Builder.validate(StreamableKafkaMessageSource.java:316) ~[axon-kafka-4.10.0.jar:4.10.0]
at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource.<init>(StreamableKafkaMessageSource.java:86) ~[axon-kafka-4.10.0.jar:4.10.0]
at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource$Builder.build(StreamableKafkaMessageSource.java:302) ~[axon-kafka-4.10.0.jar:4.10.0]
at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig.streamableKafkaMessageSource(AxonKafkaConsumerConfig.kt:29) ~[main/:na]
at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig$$SpringCGLIB$$0.CGLIB$streamableKafkaMessageSource$0(<generated>) ~[main/:na]
at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig$$SpringCGLIB$$FastClass$$1.invoke(<generated>) ~[main/:na]
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:258) ~[spring-core-6.1.14.jar:6.1.14]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:348) ~[spring-context-6.1.14.jar:6.1.14]
at org.example.axonkafkatrial.consumer.config.AxonKafkaConsumerConfig$$SpringCGLIB$$0.streamableKafkaMessageSource(<generated>) ~[main/:na]
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:146) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:644) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:636) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1355) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1185) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1443) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1353) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement.resolveMethodArguments(AutowiredAnnotationBeanPostProcessor.java:888) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement.inject(AutowiredAnnotationBeanPostProcessor.java:849) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:145) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessProperties(AutowiredAnnotationBeanPostProcessor.java:509) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1439) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:599) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:975) ~[spring-beans-6.1.14.jar:6.1.14]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:971) ~[spring-context-6.1.14.jar:6.1.14]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:625) ~[spring-context-6.1.14.jar:6.1.14]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.3.5.jar:3.3.5]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-3.3.5.jar:3.3.5]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:456) ~[spring-boot-3.3.5.jar:3.3.5]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:335) ~[spring-boot-3.3.5.jar:3.3.5]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1363) ~[spring-boot-3.3.5.jar:3.3.5]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1352) ~[spring-boot-3.3.5.jar:3.3.5]
at org.example.axonkafkatrial.consumer.ConsumerApplicationKt.main(ConsumerApplication.kt:12) ~[main/:na]
Configクラスで設定する場合、Serializerも明示的に指定しないと、application.yml側で設定したSerializerが使用されないようです。
以下のように修正しました。
package org.example.axonkafkatrial.consumer.config
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.axonframework.serialization.Serializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class AxonKafkaConsumerConfig {
private val topics: List<String> = listOf("doc-topic", "default-topic")
@Bean
fun streamableKafkaMessageSource(
consumerFactory: ConsumerFactory<String, ByteArray>,
fetcher: Fetcher<String, ByteArray, KafkaEventMessage>,
serializer: Serializer, // 追加
): StreamableKafkaMessageSource<String, ByteArray> =
StreamableKafkaMessageSource.builder<String, ByteArray>()
.topics(topics)
.consumerFactory(consumerFactory)
.fetcher(fetcher)
.serializer(serializer) // 追加
.build()
}
これで、アプリ起動に成功し、 doc-topic
と default-topic
の両方からイベント受信して処理できていることが確認できました。
INFO 85549 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=4971f167-7845-40ff-905e-43b8583c9082, body=test)
INFO 85549 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : event received: OtherCreated(id=256ee3b7-1254-4949-950b-719656602ffc, body=test)
token_entry
テーブルの token
カラムには、以下のようにトピックごとのオフセット状況が出力されるようになりました。
{"positions":{"doc-topic-0":1,"default-topic-0":1}}
おまけ
トピックのリストは、例えば以下のようにすれば、application.yml経由で設定することも可能です。
package org.example.axonkafkatrial.consumer.config
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.axonframework.serialization.Serializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class AxonKafkaConsumerConfig {
@Value("\${application.kafka.topics}")
private val topics: List<String>? = null
@Bean
fun streamableKafkaMessageSource(
consumerFactory: ConsumerFactory<String, ByteArray>,
fetcher: Fetcher<String, ByteArray, KafkaEventMessage>,
serializer: Serializer,
): StreamableKafkaMessageSource<String, ByteArray> =
StreamableKafkaMessageSource.builder<String, ByteArray>()
.topics(topics)
.consumerFactory(consumerFactory)
.fetcher(fetcher)
.serializer(serializer)
.build()
}
# 以下を追加
application:
kafka:
topics: doc-topic, default-topic
まとめ
AxonでKafkaの複数トピックからイベントを受信するための設定方法を確認しました。
Configクラスで設定を記述する場合、特にapplication.ymlによる設定と組み合わせて使用するときは、適切な項目を引数で取得して設定するようBeanを記述する必要があることがわかりました。