LoginSignup
6
5

More than 5 years have passed since last update.

Spring Cloud Streamを試す(kafka)

Posted at

はじめに

前回試した、Spring Cloud Streamでredisにつなげるやり方でkafkaでやってみる。

環境

手順

IntelliJからプロジェクトを作成する

プロジェクトを作成メニューを開く

image

左のペインから「Spring Initializr」を選択して、「Next」ボタンをクリック

image

「Name」に適当に名前をつけて「Next」ボタンをクリック

image

「Dependencies」からStream Kafkaを選択して、「Next」ボタンをクリック

image

「Project Name」に適当に名前をいれて「Finish」ボタンをクリック

image

※しばらくしてGradleの設定画面がでてきたら、「OK」ボタンをクリックする。

プロジェクトができあがる

image

デフォルトの状態で起動してみる

自動的に生成されたsrc/main/java/xxx.xxx/XXXXXXApplicationを右クリックして、「Run 'XXXX'」を選択
image

立ち上がる

image

この時点ではまだkafkaに接続しないようだ。

kafkaに接続できるように設定する

kafkaバインド用のクラスを作成する

XXXXXApplicationクラスと同じパッケージ内にkafkaバインド用のクラスを作成する。
作成するクラスは、spring-cloud-streamのgithubのサンプルを参考にして適当に作成する。

image

クラス名は、SampleSourceにした。
image

fixedDelayは何秒おきにメッセージを送るかという設定
maxMessagesPerPollは1回に最大何件メッセージを送るかという設定

package com.example;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;

import java.time.LocalDateTime;

@EnableBinding(Source.class)
public class SampleSource {

  @Bean
  @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
  public MessageSource<?> source() {
    return () -> new GenericMessage<>(LocalDateTime.now());
  }
}

ラムダ式でコンパイルエラーがでる場合は、Projectの言語レベルを変更する。
プロジェクトを右クリックして、「Open Module Settings」を選択する。

image

「Language level」を「8 - Lambdas, type annotations etc.」に変更して「OK」ボタンをクリック

image

application.propertiesをymlに変更する

application.propertiesのファイルを右クリックして、「Refactor」-「Rename...」
image

拡張子をymlに変更して「Refactor」ボタンをクリック

image

application.ymlにkafkaのtopicを指定する

topicは存在しないtopicを指定しもOK。存在しない場合は、勝手に作成される。

spring:
  cloud:
    stream:
      bindings:
        output: spring.cloud.stream.test.topic

Sourceクラスを作成した状態でアプリケーションを立ち上げてみる

zooKeeperを立ち上げる


~ ❯❯❯ zkServer start
JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
Starting zookeeper ... STARTED

kafkaを立ち上げる

~ ❯❯❯ kafka-server-start.sh /usr/local/etc/kafka/server.properties

起動ログは後半省略

[2015-12-05 12:13:28,666] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,701] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,701] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,701] INFO Property log.dirs is overridden to /usr/local/var/lib/kafka-logs (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property log.retention.check.interval.ms is overridden to 300000 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property log.segment.bytes is overridden to 1073741824 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property num.network.threads is overridden to 3 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property num.partitions is overridden to 1 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property num.recovery.threads.per.data.dir is overridden to 1 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property socket.receive.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property socket.send.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property zookeeper.connection.timeout.ms is overridden to 6000 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,740] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)
[2015-12-05 12:13:28,743] INFO [Kafka Server 0], Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2015-12-05 12:13:28,753] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2015-12-05 12:13:28,761] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
[2015-12-05 12:13:28,761] INFO Client environment:host.name=softbank126097236203.bbtec.net (org.apache.zookeeper.ZooKeeper)
[2015-12-05 12:13:28,761] INFO Client environment:java.version=1.8.0_66 (org.apache.zookeeper.ZooKeeper)
..........

SpringBootを立ち上げる

手順は前と同じで、SpringBootのApplicationクラスを右クリックして「Run XXXX」を選択する。
立ち上がる。
が、このままだと本当にメッセージが書き込まれているかわからない。

image

kafkaのコマンドでtopicの存在を確認する。

できている。

~ ❯❯❯ /usr/local/bin/kafka-topics.sh  --list --zookeeper localhost:2181
spring.cloud.stream.test.topic
~ ❯❯❯ /usr/local/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic spring.cloud.stream.test.topic
Topic:spring.cloud.stream.test.topic    PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: spring.cloud.stream.test.topic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0

kafkaのメッセージを受け取ってログに出す

kafkaのメッセージを読み込むクラスを作成する

Sourceと同じパッケージに、SampleSinkという名前で以下を作成

package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;

@EnableBinding(Sink.class)
public class SampleSink {

  private static Logger logger = LoggerFactory.getLogger(SampleSink.class);

  @ServiceActivator(inputChannel = Sink.INPUT)
  public void sink(Object payload) {
    logger.info("Received: " + payload);
  }
}

application.ymlにinputの接続設定を追加

spring:
  cloud:
    stream:
      bindings:
        output: spring.cloud.stream.test.topic
        input: spring.cloud.stream.test.topic

SpringBootのアプリケーションを再度立ち上げる

約1秒ごとにメッセージが書き込まれて、SampleSinkクラスによってログに出力されているのがわかる。

image

6
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
5