はじめに
前回Spring Cloud Streamでkafkaに接続するサンプルを試したが、今回はkafkaでメッセージを受け取って、受け取ったメッセージを複数のoutputに振り分けるmulti outputを試してみる。
環境
前回と同じ
手順
前回作った1秒おきに時間をメッセージで送るサンプルを元に、時間の秒が偶数か奇数かを判断して、メッセージを偶数用のtopicと奇数用のtopicに振り分ける処理を作成する。
kafka接続用プロジェクトを作成する
前回
※以下のサンプルも上記のプロジェクトを流用します。
kafkaのメッセージ受け取り用の設定を行う
application.ymlに出力先のtopicを追加
偶数用のメッセージと奇数用のメッセージを書き込むtopicを指定
spring:
cloud:
stream:
bindings:
output: spring.cloud.stream.test.topic
input: spring.cloud.stream.test.topic
even: spring.cloud.stream.test.even.topic # 偶数
odd: spring.cloud.stream.test.odd.topic # 奇数
偶数のメッセージを書き込むためのチャネルを定義するインターフェースを作成
EvenOutput.java
package com.example;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface EvenOutput {
String OUTPUT = "spring.cloud.stream.test.even.topic";
@Output("spring.cloud.stream.test.even.topic")
MessageChannel output();
}
奇数のメッセージを書き込むためのチャネルを定義するインターフェースを作成
OddOutput.java
package com.example;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface OddOutput {
String OUTPUT = "spring.cloud.stream.test.odd.topic";
@Output("spring.cloud.stream.test.odd.topic")
MessageChannel output();
}
前回作ったSampleSink
を改造する
前回作った、SampleSink
にメッセージを振り分ける処理を追加する
前回の状態
SampleSink.java
package com.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
@EnableBinding(Sink.class)
public class SampleSink {
private static Logger logger = LoggerFactory.getLogger(SampleSink.class);
@ServiceActivator(inputChannel = Sink.INPUT)
public void sink(Object payload) {
logger.info("Received: " + payload);
}
}
変更後
-
@EnableBiddingに
{}
(配列形式)でSink.class
の他に、先ほど作成したEvenOutput.class
とOddOutput.class
を指定 -
@Autowiredで、
EvenOutput
とOddOutput
のインターフェースを指定 - LocalDateTimeの秒を取得して、偶数だったら
evenOutput.output().send(...)
、奇数だったらoddOutput.output().send(...)のように条件によって書き込むtopicを変更する
package com.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.support.GenericMessage;
import java.time.LocalDateTime;
@EnableBinding({Sink.class, EvenOutput.class, OddOutput.class})
public class SampleSink {
private static Logger logger = LoggerFactory.getLogger(SampleSink.class);
@Autowired
private EvenOutput evenOutput;
@Autowired
private OddOutput oddOutput;
@ServiceActivator(inputChannel = Sink.INPUT)
public void sink(Object payload) {
logger.info("Received: " + payload);
LocalDateTime now = (LocalDateTime) payload;
if (now.getSecond() % 2 == 0) {
logger.info("output even: " + payload);
evenOutput.output().send(new GenericMessage<Object>(now));
} else {
logger.info("output odd : " + payload);
oddOutput.output().send(new GenericMessage<Object>(now));
}
}
}
実行してみる
前回同様以下を事前に実施
- zookeeperを立ち上げる
- kafkaを立ち上げる
SpringBootApplicationの起動
Applicationを起動すると、偶数、奇数でメッセージが書き込まれているのがわかる。
...
(省略)
...
com.example.SpringCloudStreamKafkaDemoApplication --spring.output.ansi.enabled=always
2015-12-06 10:54:49.168 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : Starting SpringCloudStreamKafkaDemoApplication on MBP-15UAC-184.local with PID 53162 (/Users/tgoto/Develop/git/spring-cloud-stream-kafka-demo/build/classes/main started by tgoto in /Users/tgoto/Develop/git/spring-cloud-stream-kafka-demo)
2015-12-06 10:54:49.172 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : No profiles are active
2015-12-06 10:54:49.233 INFO 53162 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@647fd8ce: startup date [Sun Dec 06 10:54:49 JST 2015]; root of context hierarchy
2015-12-06 10:54:49.738 INFO 53162 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'configurationPropertiesRebinderAutoConfiguration' of type [class org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$2d142414] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2015-12-06 10:54:49.916 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : Started SpringCloudStreamKafkaDemoApplication in 1.065 seconds (JVM running for 1.76)
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.3.0.RELEASE)
2015-12-06 10:54:50.075 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : No profiles are active
2015-12-06 10:54:50.090 INFO 53162 --- [ main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@7922d892: startup date [Sun Dec 06 10:54:50 JST 2015]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@647fd8ce
2015-12-06 10:54:51.035 INFO 53162 --- [ main] o.s.b.f.config.PropertiesFactoryBean : Loading properties file from URL [jar:file:/Users/tgoto/.gradle/caches/modules-2/files-2.1/org.springframework.integration/spring-integration-core/4.2.1.RELEASE/bb42e637833fd9c17df6092790d6209872e0bd65/spring-integration-core-4.2.1.RELEASE.jar!/META-INF/spring.integration.default.properties]
2015-12-06 10:54:51.039 INFO 53162 --- [ main] o.s.i.config.IntegrationRegistrar : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined.
...
(省略)
...
2015-12-06 10:55:30.551 INFO 53162 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@6d5f4900
2015-12-06 10:55:30.553 INFO 53162 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$4 : Adding {message-handler:inbound.spring.cloud.stream.test.topic} as a subscriber to the 'bridge.spring.cloud.stream.test.topic' channel
2015-12-06 10:55:30.553 INFO 53162 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$4 : started inbound.spring.cloud.stream.test.topic
2015-12-06 10:55:30.555 INFO 53162 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2015-12-06 10:55:30.556 INFO 53162 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147482647
2015-12-06 10:55:30.626 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : Received: 2015-12-05T12:10:43.528
2015-12-06 10:55:30.626 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : output odd : 2015-12-05T12:10:43.528
2015-12-06 10:55:30.629 INFO 53162 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
2015-12-06 10:55:30.632 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : Started SpringCloudStreamKafkaDemoApplication in 41.911 seconds (JVM running for 42.478)
2015-12-06 10:55:30.753 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : Received: 2015-12-05T12:10:44.617
2015-12-06 10:55:30.753 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : output even: 2015-12-05T12:10:44.617
2015-12-06 10:55:30.858 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : Received: 2015-12-05T12:10:45.622
2015-12-06 10:55:30.859 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : output odd : 2015-12-05T12:10:45.622
2015-12-06 10:55:30.860 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : Received: 2015-12-05T12:10:46.623
2015-12-06 10:55:30.860 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : output even: 2015-12-05T12:10:46.623
topicが作成されているか確認
~ ❯❯❯ /usr/local/bin/kafka-topics.sh --list --zookeeper localhost:2181
spring.cloud.stream.test.even.topic
spring.cloud.stream.test.odd.topic
spring.cloud.stream.test.topic
topicの中身を確認
LocalDateTime
の値までは見えないが何かしら書き込まれていはいる。
~ ❯❯❯ /usr/local/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic spring.cloud.stream.test.even.topic
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?
????,
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?
???.
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?
????0
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?
????2
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?