LoginSignup
3
2

More than 5 years have passed since last update.

Spring Cloud Streamを試す(kafkaで1つのメッセージを複数のメッセージに振り分ける)

Last updated at Posted at 2015-12-06

はじめに

前回Spring Cloud Streamでkafkaに接続するサンプルを試したが、今回はkafkaでメッセージを受け取って、受け取ったメッセージを複数のoutputに振り分けるmulti outputを試してみる。

環境

前回と同じ

手順

前回作った1秒おきに時間をメッセージで送るサンプルを元に、時間の秒が偶数か奇数かを判断して、メッセージを偶数用のtopicと奇数用のtopicに振り分ける処理を作成する。

kafka接続用プロジェクトを作成する

前回
※以下のサンプルも上記のプロジェクトを流用します。

kafkaのメッセージ受け取り用の設定を行う

application.ymlに出力先のtopicを追加

偶数用のメッセージと奇数用のメッセージを書き込むtopicを指定

spring:
  cloud:
    stream:
      bindings:
        output: spring.cloud.stream.test.topic
        input: spring.cloud.stream.test.topic
        even: spring.cloud.stream.test.even.topic # 偶数
        odd: spring.cloud.stream.test.odd.topic # 奇数

偶数のメッセージを書き込むためのチャネルを定義するインターフェースを作成

EvenOutput.java
package com.example;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface EvenOutput {
  String OUTPUT = "spring.cloud.stream.test.even.topic";

  @Output("spring.cloud.stream.test.even.topic")
  MessageChannel output();
}

奇数のメッセージを書き込むためのチャネルを定義するインターフェースを作成

OddOutput.java
package com.example;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface OddOutput {
  String OUTPUT = "spring.cloud.stream.test.odd.topic";

  @Output("spring.cloud.stream.test.odd.topic")
  MessageChannel output();
}

前回作ったSampleSinkを改造する

前回作った、SampleSinkにメッセージを振り分ける処理を追加する

前回の状態

SampleSink.java
package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;

@EnableBinding(Sink.class)
public class SampleSink {

  private static Logger logger = LoggerFactory.getLogger(SampleSink.class);

  @ServiceActivator(inputChannel = Sink.INPUT)
  public void sink(Object payload) {
    logger.info("Received: " + payload);
  }
}

変更後

  • @EnableBidding{}(配列形式)でSink.classの他に、先ほど作成したEvenOutput.classOddOutput.classを指定
  • @Autowiredで、EvenOutputOddOutputのインターフェースを指定
  • LocalDateTimeの秒を取得して、偶数だったらevenOutput.output().send(...)、奇数だったらoddOutput.output().send(...)のように条件によって書き込むtopicを変更する
package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.support.GenericMessage;

import java.time.LocalDateTime;

@EnableBinding({Sink.class, EvenOutput.class, OddOutput.class})
public class SampleSink {

  private static Logger logger = LoggerFactory.getLogger(SampleSink.class);

  @Autowired
  private EvenOutput evenOutput;

  @Autowired
  private OddOutput oddOutput;

  @ServiceActivator(inputChannel = Sink.INPUT)
  public void sink(Object payload) {
    logger.info("Received: " + payload);
    LocalDateTime now = (LocalDateTime) payload;
    if (now.getSecond() % 2 == 0) {
      logger.info("output even: " + payload);
      evenOutput.output().send(new GenericMessage<Object>(now));
    } else {
      logger.info("output odd : " + payload);
      oddOutput.output().send(new GenericMessage<Object>(now));
    }
  }
}

実行してみる

前回同様以下を事前に実施

  • zookeeperを立ち上げる
  • kafkaを立ち上げる

SpringBootApplicationの起動

Applicationを起動すると、偶数、奇数でメッセージが書き込まれているのがわかる。

...
(省略)
...
com.example.SpringCloudStreamKafkaDemoApplication --spring.output.ansi.enabled=always
2015-12-06 10:54:49.168  INFO 53162 --- [           main] .e.SpringCloudStreamKafkaDemoApplication : Starting SpringCloudStreamKafkaDemoApplication on MBP-15UAC-184.local with PID 53162 (/Users/tgoto/Develop/git/spring-cloud-stream-kafka-demo/build/classes/main started by tgoto in /Users/tgoto/Develop/git/spring-cloud-stream-kafka-demo)
2015-12-06 10:54:49.172  INFO 53162 --- [           main] .e.SpringCloudStreamKafkaDemoApplication : No profiles are active
2015-12-06 10:54:49.233  INFO 53162 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@647fd8ce: startup date [Sun Dec 06 10:54:49 JST 2015]; root of context hierarchy
2015-12-06 10:54:49.738  INFO 53162 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'configurationPropertiesRebinderAutoConfiguration' of type [class org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$2d142414] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2015-12-06 10:54:49.916  INFO 53162 --- [           main] .e.SpringCloudStreamKafkaDemoApplication : Started SpringCloudStreamKafkaDemoApplication in 1.065 seconds (JVM running for 1.76)

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.3.0.RELEASE)

2015-12-06 10:54:50.075  INFO 53162 --- [           main] .e.SpringCloudStreamKafkaDemoApplication : No profiles are active
2015-12-06 10:54:50.090  INFO 53162 --- [           main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@7922d892: startup date [Sun Dec 06 10:54:50 JST 2015]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@647fd8ce
2015-12-06 10:54:51.035  INFO 53162 --- [           main] o.s.b.f.config.PropertiesFactoryBean     : Loading properties file from URL [jar:file:/Users/tgoto/.gradle/caches/modules-2/files-2.1/org.springframework.integration/spring-integration-core/4.2.1.RELEASE/bb42e637833fd9c17df6092790d6209872e0bd65/spring-integration-core-4.2.1.RELEASE.jar!/META-INF/spring.integration.default.properties]
2015-12-06 10:54:51.039  INFO 53162 --- [           main] o.s.i.config.IntegrationRegistrar        : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. 
...
(省略)
...
2015-12-06 10:55:30.551  INFO 53162 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@6d5f4900
2015-12-06 10:55:30.553  INFO 53162 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$4  : Adding {message-handler:inbound.spring.cloud.stream.test.topic} as a subscriber to the 'bridge.spring.cloud.stream.test.topic' channel
2015-12-06 10:55:30.553  INFO 53162 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$4  : started inbound.spring.cloud.stream.test.topic
2015-12-06 10:55:30.555  INFO 53162 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2015-12-06 10:55:30.556  INFO 53162 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147482647
2015-12-06 10:55:30.626  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : Received: 2015-12-05T12:10:43.528
2015-12-06 10:55:30.626  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : output odd : 2015-12-05T12:10:43.528
2015-12-06 10:55:30.629  INFO 53162 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
2015-12-06 10:55:30.632  INFO 53162 --- [           main] .e.SpringCloudStreamKafkaDemoApplication : Started SpringCloudStreamKafkaDemoApplication in 41.911 seconds (JVM running for 42.478)
2015-12-06 10:55:30.753  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : Received: 2015-12-05T12:10:44.617
2015-12-06 10:55:30.753  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : output even: 2015-12-05T12:10:44.617
2015-12-06 10:55:30.858  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : Received: 2015-12-05T12:10:45.622
2015-12-06 10:55:30.859  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : output odd : 2015-12-05T12:10:45.622
2015-12-06 10:55:30.860  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : Received: 2015-12-05T12:10:46.623
2015-12-06 10:55:30.860  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : output even: 2015-12-05T12:10:46.623

topicが作成されているか確認

~ ❯❯❯ /usr/local/bin/kafka-topics.sh  --list --zookeeper localhost:2181
spring.cloud.stream.test.even.topic
spring.cloud.stream.test.odd.topic
spring.cloud.stream.test.topic

topicの中身を確認

LocalDateTimeの値までは見えないが何かしら書き込まれていはいる。

~ ❯❯❯ /usr/local/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic spring.cloud.stream.test.even.topic
?
 contentType8"application/x-java-object;type=java.time.LocalDateTime"
                                                                     ?

????,
?
 contentType8"application/x-java-object;type=java.time.LocalDateTime"
                                                                     ?

???.
?
 contentType8"application/x-java-object;type=java.time.LocalDateTime"
                                                                     ?

????0
?
 contentType8"application/x-java-object;type=java.time.LocalDateTime"
                                                                     ?

????2
?
 contentType8"application/x-java-object;type=java.time.LocalDateTime"
                                                                     ?
3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2