Spring Cloud Streamを試す(kafkaで1つのメッセージを複数のメッセージに振り分ける)

More than 3 years have passed since last update.


はじめに

前回Spring Cloud Streamでkafkaに接続するサンプルを試したが、今回はkafkaでメッセージを受け取って、受け取ったメッセージを複数のoutputに振り分けるmulti outputを試してみる。


環境

前回と同じ


手順

前回作った1秒おきに時間をメッセージで送るサンプルを元に、時間の秒が偶数か奇数かを判断して、メッセージを偶数用のtopicと奇数用のtopicに振り分ける処理を作成する。


kafka接続用プロジェクトを作成する

前回

※以下のサンプルも上記のプロジェクトを流用します。


kafkaのメッセージ受け取り用の設定を行う


application.ymlに出力先のtopicを追加

偶数用のメッセージと奇数用のメッセージを書き込むtopicを指定

spring:

cloud:
stream:
bindings:
output: spring.cloud.stream.test.topic
input: spring.cloud.stream.test.topic
even: spring.cloud.stream.test.even.topic # 偶数
odd: spring.cloud.stream.test.odd.topic # 奇数


偶数のメッセージを書き込むためのチャネルを定義するインターフェースを作成


EvenOutput.java

package com.example;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface EvenOutput {
String OUTPUT = "spring.cloud.stream.test.even.topic";

@Output("spring.cloud.stream.test.even.topic")
MessageChannel output();
}



奇数のメッセージを書き込むためのチャネルを定義するインターフェースを作成


OddOutput.java

package com.example;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface OddOutput {
String OUTPUT = "spring.cloud.stream.test.odd.topic";

@Output("spring.cloud.stream.test.odd.topic")
MessageChannel output();
}



前回作ったSampleSinkを改造する

前回作った、SampleSinkにメッセージを振り分ける処理を追加する

前回の状態


SampleSink.java

package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;

@EnableBinding(Sink.class)
public class SampleSink {

private static Logger logger = LoggerFactory.getLogger(SampleSink.class);

@ServiceActivator(inputChannel = Sink.INPUT)
public void sink(Object payload) {
logger.info("Received: " + payload);
}
}


変更後



  • @EnableBidding{}(配列形式)でSink.classの他に、先ほど作成したEvenOutput.classOddOutput.classを指定


  • @Autowiredで、EvenOutputOddOutputのインターフェースを指定

  • LocalDateTimeの秒を取得して、偶数だったらevenOutput.output().send(...)、奇数だったらoddOutput.output().send(...)のように条件によって書き込むtopicを変更する

package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.support.GenericMessage;

import java.time.LocalDateTime;

@EnableBinding({Sink.class, EvenOutput.class, OddOutput.class})
public class SampleSink {

private static Logger logger = LoggerFactory.getLogger(SampleSink.class);

@Autowired
private EvenOutput evenOutput;

@Autowired
private OddOutput oddOutput;

@ServiceActivator(inputChannel = Sink.INPUT)
public void sink(Object payload) {
logger.info("Received: " + payload);
LocalDateTime now = (LocalDateTime) payload;
if (now.getSecond() % 2 == 0) {
logger.info("output even: " + payload);
evenOutput.output().send(new GenericMessage<Object>(now));
} else {
logger.info("output odd : " + payload);
oddOutput.output().send(new GenericMessage<Object>(now));
}
}
}


実行してみる


前回同様以下を事前に実施


  • zookeeperを立ち上げる

  • kafkaを立ち上げる


SpringBootApplicationの起動

Applicationを起動すると、偶数、奇数でメッセージが書き込まれているのがわかる。

...

(省略)
...
com.example.SpringCloudStreamKafkaDemoApplication --spring.output.ansi.enabled=always
2015-12-06 10:54:49.168 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : Starting SpringCloudStreamKafkaDemoApplication on MBP-15UAC-184.local with PID 53162 (/Users/tgoto/Develop/git/spring-cloud-stream-kafka-demo/build/classes/main started by tgoto in /Users/tgoto/Develop/git/spring-cloud-stream-kafka-demo)
2015-12-06 10:54:49.172 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : No profiles are active
2015-12-06 10:54:49.233 INFO 53162 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@647fd8ce: startup date [Sun Dec 06 10:54:49 JST 2015]; root of context hierarchy
2015-12-06 10:54:49.738 INFO 53162 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'configurationPropertiesRebinderAutoConfiguration' of type [class org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$2d142414] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2015-12-06 10:54:49.916 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : Started SpringCloudStreamKafkaDemoApplication in 1.065 seconds (JVM running for 1.76)

. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.3.0.RELEASE)

2015-12-06 10:54:50.075 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : No profiles are active
2015-12-06 10:54:50.090 INFO 53162 --- [ main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@7922d892: startup date [Sun Dec 06 10:54:50 JST 2015]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@647fd8ce
2015-12-06 10:54:51.035 INFO 53162 --- [ main] o.s.b.f.config.PropertiesFactoryBean : Loading properties file from URL [jar:file:/Users/tgoto/.gradle/caches/modules-2/files-2.1/org.springframework.integration/spring-integration-core/4.2.1.RELEASE/bb42e637833fd9c17df6092790d6209872e0bd65/spring-integration-core-4.2.1.RELEASE.jar!/META-INF/spring.integration.default.properties]
2015-12-06 10:54:51.039 INFO 53162 --- [ main] o.s.i.config.IntegrationRegistrar : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined.
...
(省略)
...
2015-12-06 10:55:30.551 INFO 53162 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@6d5f4900
2015-12-06 10:55:30.553 INFO 53162 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$4 : Adding {message-handler:inbound.spring.cloud.stream.test.topic} as a subscriber to the 'bridge.spring.cloud.stream.test.topic' channel
2015-12-06 10:55:30.553 INFO 53162 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$4 : started inbound.spring.cloud.stream.test.topic
2015-12-06 10:55:30.555 INFO 53162 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2015-12-06 10:55:30.556 INFO 53162 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147482647
2015-12-06 10:55:30.626 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : Received: 2015-12-05T12:10:43.528
2015-12-06 10:55:30.626 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : output odd : 2015-12-05T12:10:43.528
2015-12-06 10:55:30.629 INFO 53162 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
2015-12-06 10:55:30.632 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : Started SpringCloudStreamKafkaDemoApplication in 41.911 seconds (JVM running for 42.478)
2015-12-06 10:55:30.753 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : Received: 2015-12-05T12:10:44.617
2015-12-06 10:55:30.753 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : output even: 2015-12-05T12:10:44.617
2015-12-06 10:55:30.858 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : Received: 2015-12-05T12:10:45.622
2015-12-06 10:55:30.859 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : output odd : 2015-12-05T12:10:45.622
2015-12-06 10:55:30.860 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : Received: 2015-12-05T12:10:46.623
2015-12-06 10:55:30.860 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : output even: 2015-12-05T12:10:46.623


topicが作成されているか確認

~ ❯❯❯ /usr/local/bin/kafka-topics.sh  --list --zookeeper localhost:2181

spring.cloud.stream.test.even.topic
spring.cloud.stream.test.odd.topic
spring.cloud.stream.test.topic


topicの中身を確認

LocalDateTimeの値までは見えないが何かしら書き込まれていはいる。

~ ❯❯❯ /usr/local/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic spring.cloud.stream.test.even.topic

?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?

????,
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?

???.
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?

????0
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?

????2
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?