はじめに
前回試した、Spring Cloud Streamでredisにつなげるやり方でkafkaでやってみる。
環境
-
Mac
-
IntelliJ 15
-
kafka
手順
IntelliJからプロジェクトを作成する
プロジェクトを作成メニューを開く
左のペインから「Spring Initializr」を選択して、「Next」ボタンをクリック
「Name」に適当に名前をつけて「Next」ボタンをクリック
「Dependencies」からStream Kafka
を選択して、「Next」ボタンをクリック
「Project Name」に適当に名前をいれて「Finish」ボタンをクリック
※しばらくしてGradleの設定画面がでてきたら、「OK」ボタンをクリックする。
プロジェクトができあがる
デフォルトの状態で起動してみる
自動的に生成されたsrc/main/java/xxx.xxx/XXXXXXApplicationを右クリックして、「Run 'XXXX'」を選択
立ち上がる
この時点ではまだkafkaに接続しないようだ。
kafkaに接続できるように設定する
kafkaバインド用のクラスを作成する
XXXXXApplicationクラスと同じパッケージ内にkafkaバインド用のクラスを作成する。
作成するクラスは、spring-cloud-streamのgithubのサンプルを参考にして適当に作成する。
fixedDelay
は何秒おきにメッセージを送るかという設定
maxMessagesPerPoll
は1回に最大何件メッセージを送るかという設定
package com.example;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
import java.time.LocalDateTime;
@EnableBinding(Source.class)
public class SampleSource {
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<?> source() {
return () -> new GenericMessage<>(LocalDateTime.now());
}
}
ラムダ式でコンパイルエラーがでる場合は、Projectの言語レベルを変更する。
プロジェクトを右クリックして、「Open Module Settings」を選択する。
「Language level」を「8 - Lambdas, type annotations etc.」に変更して「OK」ボタンをクリック
application.propertiesをymlに変更する
application.propertiesのファイルを右クリックして、「Refactor」-「Rename...」
拡張子をymlに変更して「Refactor」ボタンをクリック
application.ymlにkafkaのtopicを指定する
topicは存在しないtopicを指定しもOK。存在しない場合は、勝手に作成される。
spring:
cloud:
stream:
bindings:
output: spring.cloud.stream.test.topic
Sourceクラスを作成した状態でアプリケーションを立ち上げてみる
zooKeeperを立ち上げる
~ ❯❯❯ zkServer start
JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
Starting zookeeper ... STARTED
kafkaを立ち上げる
~ ❯❯❯ kafka-server-start.sh /usr/local/etc/kafka/server.properties
起動ログは後半省略
[2015-12-05 12:13:28,666] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,701] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,701] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,701] INFO Property log.dirs is overridden to /usr/local/var/lib/kafka-logs (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property log.retention.check.interval.ms is overridden to 300000 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property log.segment.bytes is overridden to 1073741824 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property num.network.threads is overridden to 3 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,702] INFO Property num.partitions is overridden to 1 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property num.recovery.threads.per.data.dir is overridden to 1 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property socket.receive.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property socket.send.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,703] INFO Property zookeeper.connection.timeout.ms is overridden to 6000 (kafka.utils.VerifiableProperties)
[2015-12-05 12:13:28,740] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)
[2015-12-05 12:13:28,743] INFO [Kafka Server 0], Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2015-12-05 12:13:28,753] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2015-12-05 12:13:28,761] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
[2015-12-05 12:13:28,761] INFO Client environment:host.name=softbank126097236203.bbtec.net (org.apache.zookeeper.ZooKeeper)
[2015-12-05 12:13:28,761] INFO Client environment:java.version=1.8.0_66 (org.apache.zookeeper.ZooKeeper)
..........
SpringBootを立ち上げる
手順は前と同じで、SpringBootのApplicationクラスを右クリックして「Run XXXX」を選択する。
立ち上がる。
が、このままだと本当にメッセージが書き込まれているかわからない。
kafkaのコマンドでtopicの存在を確認する。
できている。
~ ❯❯❯ /usr/local/bin/kafka-topics.sh --list --zookeeper localhost:2181
spring.cloud.stream.test.topic
~ ❯❯❯ /usr/local/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic spring.cloud.stream.test.topic
Topic:spring.cloud.stream.test.topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: spring.cloud.stream.test.topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
kafkaのメッセージを受け取ってログに出す
kafkaのメッセージを読み込むクラスを作成する
Sourceと同じパッケージに、SampleSink
という名前で以下を作成
package com.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
@EnableBinding(Sink.class)
public class SampleSink {
private static Logger logger = LoggerFactory.getLogger(SampleSink.class);
@ServiceActivator(inputChannel = Sink.INPUT)
public void sink(Object payload) {
logger.info("Received: " + payload);
}
}
application.ymlにinputの接続設定を追加
spring:
cloud:
stream:
bindings:
output: spring.cloud.stream.test.topic
input: spring.cloud.stream.test.topic
SpringBootのアプリケーションを再度立ち上げる
約1秒ごとにメッセージが書き込まれて、SampleSink
クラスによってログに出力されているのがわかる。