#はじめに
最近kafkaをspringbootで使おうと思って、いろいろと調べてました。
springKafkaは知名度に反して日本語サイトで参考になるサイトが少なかったので、自分用の忘備録として残しておこうと思います。
この記事は、Springbootを使ったHello World!からSpring kafkaによるメッセージ送受信を実装していきます。
詳しいkafkaについての解説などは行いませんので、そちらが気になる方は以下の記事を参照してください。
Apache Kafkaの概要とアーキテクチャ
#環境
今回の記事で前提としている環境はこちら
- VScode 1.38.1
- java version "1.8.0_221"
- windows 10
もしまだVSCodeやJavaをインストールしていない場合は色んなサイトでインストール方法を紹介しているのでそちらを参照しながらやると良いと思います.今回はWindowsで話を進めますが,MacやLinuxでもおそらく変わらないはずです.
##VSCode
##JDK
-
だいたいこれでできる
OpenJDK(Java)を最新のUbuntuにインストール -
windowsはこれ
初心者でも簡単にできる!Javaをインストールする方法(Windows編)
注意しなければいけないこととしては、OracleからJavaをインストールする場合はOracleにアカウントを作らなければいけないこと。(アカウント作らなくてもインストールできるの?)
##VSCodeでJavaの動作確認
- 上の記事でJavaのバージョンが確認できたら,VSCodeでJavaが開発できるように環境を整える
VSCodeでJavaの開発環境を構築する
SampleでJavaコードがなにか動かせたら問題なし.
また今回はSpring-bootアプリを作るのでこちらもインストールしておくと便利です。アノテーションなどspring-bootでよく使う補完機能やらDashboardやらをまとめてインストールしてくれます。
#まずはSpringBootで「Hello World!」
上の設定で準備ができたら、Spring bootアプリをVSCode内で行っていきます。
##VSCode内でのspringbootアプリの作り方
VSCode内で「Shift+Ctrl+P」を押下するとコマンドパレットが開けます。そこに「spring initializr」と打ちましょう。
このような画面が出てくるので、好きなものを選んでください。
ネットの記事でよく見るのはMavenですが、pomをいちいち設定するのが面倒くさいので今回の記事ではGradleを選択します。
次に、どの言語で書くか聞かれるので好きな言語を選んでください。今回の記事ではJavaを選択します。
次にグループIDを聞かれるので好きなものに変えましょう。
もちろん、このままでもいいのですがせっかくなのでkafkaのサンプルなのが分かるような名前にします。
次にプロジェクト名を聞かれるので好きなものにしましょう。
もちろんこのままでも構いませんが、今回はこのプロジェクト名も変更します。
次にspring-bootのバージョンを選びます。
公式サイトだとこれ以外のバージョンも選べるようですが、あまりその意味はないので今回は素直に一番上の2.1.9を選びます。
つぎに、spring-bootにいれる設定やらを選びます。Last usedは前回Spring-bootで作った設定をそのまま使えます。
正直たくさんありすぎてわからないのですが、便利なLombok先輩と今回の記事のメインであるSpring for Apache Kafkam、あとついでにSpring webの3つを選んでおけばとりあえず問題ありません。
↑選ぶと今何個選んでいるか教えてくれる。
この状態になったら「Selected ** dependencies」でエンターしてください。
どこでそのアプリを作るのか聞かれるので、好きなフォルダを選び「Generate into this folder」をクリック。
無事に成功すれば右下にワークスペースをどこにするのか尋ねるポップアップが表示されます。
「Open」をクリックするとVSCodeがもう一つ立ち上がり別途ワークスペースを作成してくれます。
「Add to WorkSpace」をクリックすると現在開いているワークスペース内に作成されます。
できたらこんな感じです。
ここでも右下にポップアップが出てきますが、これもどれを選んでも構いません。
spring bootで「Hello World!」
さっそく「Hello World!」を出力できるようにしていきましょう。
作り始めたばかりの時は、下のようなMainクラスが作成されているはずです。
package com.kafka.sample.kafka_sample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
ここに作成してもいいのですが、せっかくなのでControllerクラスを作ってWebからアクセスできるようにしましょう。
DemoApplicationと同じフォルダーで右クリックし、「新しいファイルを作成」をクリック。「Controller.java」を作成しましょう。
そこで「Hello World!」を出力できるようにコードを書きます。
こんな感じ
package com.kafka.sample.kafka_sample;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/kafka-sample")
public class Controller {
@GetMapping
public String viewHelloWorld() {
return "Hello world!";
}
}
では、動かしてみます。
VSCode内でF5を押します。
初めて起動するとき、アプリの構成を示したJsonファイルが表示されます。
{
// IntelliSense を使用して利用可能な属性を学べます。
// 既存の属性の説明をホバーして表示します。
// 詳細情報は次を確認してください: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "java",
"name": "Debug (Launch) - Current File",
"request": "launch",
"mainClass": "${file}"
},
{
"type": "java",
"name": "Debug (Launch)-DemoApplication<kafka_sample>",
"request": "launch",
"mainClass": "com.kafka.sample.kafka_sample.DemoApplication",
"projectName": "kafka_sample"
}
]
}
おそらく問題がないので、その場でもう一度F5を押します。
すると、ターミナル上でこのように起動します。
起動したら、先ほど指定したURLにlocalhostでアクセスしてみましょう。
こんな感じで表示されていれば成功です。
これでspringbootをつかった「Hello World!」は終わりです。
Spring kafkaを使ったデータ受け渡し
ようやくここから本題です。
まずはkafkaを扱うための準備をしましょう。
kafkaを扱うためにはkafkaをインストールしなければなりません。
昨今のトレンドとしてはDockerなどを使ってローカル環境を汚さずに使うのですが、今回はそこにまで言及すると趣旨がずれるのでまた別の機会に記事にします。
kafkaのインストール
公式サイトよりQuick Startが提供されています。
https://kafka.apache.org/quickstart
そちらを参考にやればできるかと思います。
ただし、WindowsはQuickStartで解説されているシェルスクリプトではなくバッチファイルを使用します。
一応、windowsの方向けにコマンドと流れを解説します。
Windows向けKafka立ち上げ
まず、kafkaのパッケージをインストールし、解凍します。
その後、解凍したkafkaフォルダに入ってください。
そしたら、zookeeperを立ち上げるバッチを起動します。
.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
このようなメッセージが出れば無事立ち上げ成功です。
INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
次にkafkaを立ち上げます。
もう1つコマンドプロンプトを起動し、そこで先ほど同様kafkaフォルダに移動してください。
そこで別のバッチを起動します。
.\bin\windows\kafka-server-start.bat config\server.properties
これで問題なく立ち上がるはずです。
コマンドプロンプトからkafkaを使う方法は割愛します。
同じ要領でバッチを起動すればQuick Startと同じように動作確認ができます。
spring Kafkaを使ったConsumer,Producerの実装
kafkaが無事立ち上がったことが確認できれば、いよいよコードを書いていきましょう。
KafkaはProducerでメッセージを投げ、Consumerでメッセージを受け取ります。
今回はそれが分かるようにProducerとConsumerそれぞれ実装していきます。
まずは、Producerから作成していきます。
package com.kafka.sample.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Producer{
//log出力用
private static final Logger LOGGER=LoggerFactory.getLogger(Producer.class);
//送り先のTOPICを定義
private static final String TOPIC="sample";
//KafkaのkeyとValueの型を設定できます。
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public void sendKafka(String message) {
LOGGER.info("kafka send message is {}",message);
//指定したTOPICにmessageを送信できる。
kafkaTemplate.send(TOPIC,message);
}
}
今回はKafkaにsendするものはStringのみにします。
jsonを送る場合はMessageBuilderを使用するとTopicやHeaderなども纏めることができ、楽にメッセージ作成をすることができます。
次にConsumerを作ります。
package com.kafka.sample.kafka_sample.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class Consumer{
//log出力用
private static final Logger LOGGER=LoggerFactory.getLogger(Consumer.class);
//kafkaListenerによってどのTopicの情報を取得するのか、
//どのgroupIdの情報を取得するのかを設定でき、エラーが発生した場合のハンドリングなども可能。
@KafkaListener(topics = "test",groupId = "sample")
public void consumer(String message) {
LOGGER.info("received message is {}",message);
}
}
今回はメッセージを受け取り、ログを出力するという簡単なものです。
注意点としましては、KafkaListenerアノテーションはTopicとIdを指定しないとコンパイル時にエラーになってしまうことです。VSCodeのエラーチェックでは検出されないので注意してください。
また、Listenerはkafkaにメッセージを送信することはできないので、Listenerで受け取ったメッセージを加工しkafkaに送信する際は、再度sendする必要があります。
最後に、Controller.javaでこれらを使えるように修正します。
package com.kafka.sample.kafka_sample;
import com.kafka.sample.kafka_sample.Producer.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@EnableConfigurationProperties
@RequestMapping("/kafka-sample")
public class Controller {
@Autowired
private Producer producer;
@GetMapping
public void sendMessage(@RequestParam("message") String message){
producer.sendKafka(message);
}
}
といっても今回Controllerに書くのはProducerだけで十分ですけどね。
KafkaListenerは指定したTopicでKafkaが受信したときに起動するためです。
動作確認
動かしてちゃんと動くか見てみます。VSCode内でF5を押してください。
起動に失敗しても、何度かF5を押すとうまくいきます。
うまく起動出来たら、ターミナルにはこのようなメッセージが表示されます。
INFO 15576 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [test-0]
先ほどの「Hello World!」の時と同じようにURLにアクセスします。
今回はメッセージとして「Hello Kafka」と送ってみます。
http://localhost:8080/kafka-sample/?message=Hello%20Kafka
上のような感じでURLでメッセージを飛ばします。
すると下の画面のような真っ白な画面が表示されるはずです。(何も出力していないため)
では、VSCodeに戻ってログを見てみます。
すると
ログの一番下にConsumerで記載したログが表示されていることが確認できます。
これでkafkaでのメッセージのやり取りについては終わりです。
最後に実際にコマンドプロンプトからkafkaの現在の状態を見てみましょう。
コマンドを使った確認
最初にtopicの確認から。
今回はtestというトピックを作成しました。ちゃんと作成できているのでしょうか。
コマンドプロンプトを開き、kafkaフォルダに移動後以下のコマンドを実行してください。
.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
このコマンドはkafkaにあるTopicをすべて表示するコマンドです。きちんとtestと表示されていることが確認できるかと思います。
次にConsumerを見てみて、先ほど送ったメッセージがちゃんと登録されているか確認してみましょう。
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
--topicの後ろは設定したTopic名を入れてください。
すると
PS C:\Users\***\Downloads\kafka_2.12-2.3.0> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
Hello Kafka
ありました!
これで無事kafkaにメッセージが送受信されていることが確認できました。
#おわり
今回はspringbootを用いたSpringKafkaでの簡単なメッセージの受け渡しについて書きました。
私が初めてKafkaに触ったとき出てきた記事がほとんど英語で辛かった思い出から書いたので、初心者向けの内容となっています。
なるべく1からできるように書いたつもりですが、もしわかりにくいところや詳しく聞きたいところがあればコメントにてアドバイスいただけますと幸いです。
最後に今回作ったソースコードはGitHubにて公開していますので、書くのがめんどくさいという方はクローンして使ってみてください。
https://github.com/ft0220/KAFKA_SAMPLE
今度はDocker使った起動方法やTestの作り方やエラーのハンドル方法などもう少し詳しい実装面のあれこれも書きたいな。