6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

MDCAdvent Calendar 2019

Day 11

SpringBootではじめるメッセージ連携 Apache Kafka編

Last updated at Posted at 2019-12-10

目次

  1. 前回までのあらすじ
  2. 記事の趣旨
  3. 前提環境
  4. メッセージング基盤を構築する(Kafka編)
  5. Sink,Source,Processorを作る(Kafka編)
  6. 動作確認
  7. まとめ

前回までのあらすじ

メッセージ駆動システムの設計実装について、第一歩を踏み出しました。
SpringBootではじめるメッセージ連携

記事の趣旨

二歩目を踏み出しましょう。

お手軽に、メッセージ駆動の恩恵を受けられるようになった一方で
RabbitMQはクラスタ構成が組めないので
スケール上限や、ノードレベル耐障害性の確保などはApache Kafkaに見劣りする様子。

せっかくなのでApache Kafkaにも触れてみましょう。

完成したコードはこちら
記事では解説しないがテストクラス付き。

前提環境

  • Spring Tool Suiteが導入されていること。
  • JRE 8以上が導入されていること。

Dockerは今回は使いません。
その他は、前回と同様のためバージョン情報は割愛。

メッセージング基盤を構築する(Kafka編)

前述のとおりApache Kafkaを採用する。

Apache Kafkaの公式サイトにquickstartが掲載されているので
この手順に則って導入します。

まずはApache Kafka配布ページよりHTTPリンクを入手。

アクセスするたびにsuggest対象のURLが変わるページ構成。
特に理由なければ「We suggest~」に表示されるURLをメモすること。負荷分散に協力しましょう。

以下ご参考。meisei-u.ac.jpがsuggestされた場合のスクリーンショット。
スクリーンショット 2019-12-07 22.34.10.png

次に、ダウンロードしたファイルを展開して下位フォルダに移動。

# wget [kafka.tgzのURL]
# cd [解凍後フォルダ]

起動コマンドを投入します。まずはZooKeeper。

# bin/zookeeper-server-start.sh config/zookeeper.properties

次にKafka本体。

# bin/kafka-server-start.sh config/server.properties

少し、動作確認しましょう。
testという名前のtopic(メッセージの送信先)を作成。

# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

topic一覧を表示。topicが出来たことを確認します。

# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test

これまでの手順により、Kafkaでメッセージを処理する準備が整いました。

Sink,Source,Processorを作る(Kafka編)

前回と同様にSpring Cloud Streamを用いて
SinkとSourceおよびProcessorを作りますが...
実装コードは1行も変更なし。単体テストも同一ソースです。

前回のコードはこちら
今回のコードはこちら

唯一、pom.xmlのみ修正しています。
RabbitMQ関連を削除し、Apache Kafka関連を追加。
完成したpom.xmlを前回と比較いただければ判るので、仔細は割愛。

最も重要な変更は以下。ガイドはこちら

(中略)
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
		</dependency>
(中略)

アプリには特段、プロパティ値を設定していないので
デフォルトに基づいてkafkaへの接続先情報(localhost:9092)や各種パラメータが設定されることがポイント。

動作確認

それぞれのプロジェクトについてmaven clean packageを実行。
target配下に生成された各Jarを以下のとおり起動する。

$ java -jar target/hello-source-kafka-0.0.1-SNAPSHOT.jar --server.port=8080
$ java -jar target/hello-sink-kafka-0.0.1-SNAPSHOT.jar  --server.port=8082
$ java -jar target/hello-processor-kafka-0.0.1-SNAPSHOT.jar --server.port=8090

起動ログ中に、Kafkaを用いていることが表示される。
以下はSinkのログ出力例。

2019-12-07 12:45:28.595  INFO 4986 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: hello-sink
2019-12-07 12:45:28.600  INFO 4986 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
	bootstrap.servers = [localhost:9092]
(中略)
	ssl.truststore.type = JKS
2019-12-07 12:45:28.707  INFO 4986 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2019-12-07 12:45:28.709  INFO 4986 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2019-12-07 12:45:28.710  INFO 4986 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1575722728705

curlで、Sourceに対してPOSTリクエストを送る。

$ curl -v localhost:8080 -d '{"tweet":"Hello"}' -H 'Content-Type: application/json'

SinkのJarを起動したコンソールに、メッセージが出力される。

Received Hello processing!

まとめ

SpringBootならびにApache Kafka、Spring Cloud Streamを用いて
マイクロサービスなメッセージ連携の設計/実装方法を学んだ。

Spring Cloud Streamを用いることで
メッセージング基盤への依存性を、アプリ設計や実装から取り除くことが可能。

是非それぞれのメッセージング基盤を沢山、取っ替え引っ替えして遊んで頂きたい。

参考文献

Spring Cloud Stream Kafka Binder Reference Guide

Apache Kafka QuickStart

Apache Kafkaを試してみる

6
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?