spring-boot
Kafka

Spring Cloud Streamを試す(kafka)

More than 3 years have passed since last update.


はじめに

前回試した、Spring Cloud Streamでredisにつなげるやり方でkafkaでやってみる。


環境


手順


IntelliJからプロジェクトを作成する


プロジェクトを作成メニューを開く

image


左のペインから「Spring Initializr」を選択して、「Next」ボタンをクリック

image


「Name」に適当に名前をつけて「Next」ボタンをクリック

image


「Dependencies」からStream Kafkaを選択して、「Next」ボタンをクリック

image


「Project Name」に適当に名前をいれて「Finish」ボタンをクリック

image

※しばらくしてGradleの設定画面がでてきたら、「OK」ボタンをクリックする。


プロジェクトができあがる

image


デフォルトの状態で起動してみる

自動的に生成されたsrc/main/java/xxx.xxx/XXXXXXApplicationを右クリックして、「Run 'XXXX'」を選択

image


立ち上がる

image

この時点ではまだkafkaに接続しないようだ。


kafkaに接続できるように設定する


kafkaバインド用のクラスを作成する

XXXXXApplicationクラスと同じパッケージ内にkafkaバインド用のクラスを作成する。

作成するクラスは、spring-cloud-streamのgithubのサンプルを参考にして適当に作成する。

image

クラス名は、SampleSourceにした。

image

fixedDelayは何秒おきにメッセージを送るかという設定

maxMessagesPerPollは1回に最大何件メッセージを送るかという設定

package com.example;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;

import java.time.LocalDateTime;

@EnableBinding(Source.class)
public class SampleSource {

@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<?> source() {
return () -> new GenericMessage<>(LocalDateTime.now());
}
}

ラムダ式でコンパイルエラーがでる場合は、Projectの言語レベルを変更する。

プロジェクトを右クリックして、「Open Module Settings」を選択する。

image

「Language level」を「8 - Lambdas, type annotations etc.」に変更して「OK」ボタンをクリック

image


application.propertiesをymlに変更する

application.propertiesのファイルを右クリックして、「Refactor」-「Rename...」

image


拡張子をymlに変更して「Refactor」ボタンをクリック

image


application.ymlにkafkaのtopicを指定する

topicは存在しないtopicを指定しもOK。存在しない場合は、勝手に作成される。

spring:

cloud:
stream:
bindings:
output: spring.cloud.stream.test.topic


Sourceクラスを作成した状態でアプリケーションを立ち上げてみる


zooKeeperを立ち上げる


~ ❯❯❯ zkServer start
JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
Starting zookeeper ... STARTED


kafkaを立ち上げる

~ ❯❯❯ kafka-server-start.sh /usr/local/etc/kafka/server.properties

起動ログは後半省略

[2015-12-05 12:13:28,666] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2015-12-05 12:13:28,701] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,701] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,701] INFO Property log.dirs is overridden to /usr/local/var/lib/kafka-logs (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property log.retention.check.interval.ms is overridden to 300000 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property log.segment.bytes is overridden to 1073741824 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property num.network.threads is overridden to 3 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property num.partitions is overridden to 1 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property num.recovery.threads.per.data.dir is overridden to 1 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property socket.receive.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property socket.send.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property zookeeper.connection.timeout.ms is overridden to 6000 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,740] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)
[2015-12-05 12:13:28,743] INFO [Kafka Server 0], Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2015-12-05 12:13:28,753] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2015-12-05 12:13:28,761] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
[2015-12-05 12:13:28,761] INFO Client environment:host.name=softbank126097236203.bbtec.net (org.apache.zookeeper.ZooKeeper)
[2015-12-05 12:13:28,761] INFO Client environment:java.version=1.8.0_66 (org.apache.zookeeper.ZooKeeper)
..........


SpringBootを立ち上げる

手順は前と同じで、SpringBootのApplicationクラスを右クリックして「Run XXXX」を選択する。

立ち上がる。

が、このままだと本当にメッセージが書き込まれているかわからない。

image


kafkaのコマンドでtopicの存在を確認する。

できている。

~ ❯❯❯ /usr/local/bin/kafka-topics.sh  --list --zookeeper localhost:2181

spring.cloud.stream.test.topic

~ ❯❯❯ /usr/local/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic spring.cloud.stream.test.topic

Topic:spring.cloud.stream.test.topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: spring.cloud.stream.test.topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0


kafkaのメッセージを受け取ってログに出す


kafkaのメッセージを読み込むクラスを作成する

Sourceと同じパッケージに、SampleSinkという名前で以下を作成

package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;

@EnableBinding(Sink.class)
public class SampleSink {

private static Logger logger = LoggerFactory.getLogger(SampleSink.class);

@ServiceActivator(inputChannel = Sink.INPUT)
public void sink(Object payload) {
logger.info("Received: " + payload);
}
}


application.ymlにinputの接続設定を追加

spring:

cloud:
stream:
bindings:
output: spring.cloud.stream.test.topic
input: spring.cloud.stream.test.topic


SpringBootのアプリケーションを再度立ち上げる

約1秒ごとにメッセージが書き込まれて、SampleSinkクラスによってログに出力されているのがわかる。

image