目次
- メッセージ連携とは
- 記事の趣旨
- 前提環境
- メッセージング基盤を構築する
- Sourceを作る
- Sinkを作る
- Processorを作る
- 動作確認
- まとめ
メッセージ連携とは
メッセージ連携とは、アプリケーションプログラム同士で
データや処理要求など「メッセージ」を交換することで、
複数アプリケーションを非同期に連携させるための仕組み。
マイクロサービス設計においては
メッセージを一時的に保持し、この受け渡しを仲介するミドルウェアを導入することで
各サービスを疎に保ち、スケーラビリティ向上を図る方針が採られている。
みんな大好きSpringFrameworkおよびSpringBootでは、
メッセージ連携を担うサービスの軽量開発を実現する
Spring Cloud Streamプロジェクトが注目されている。
記事の趣旨
Spring Cloud Streamを使ってメッセージ連携に触れてみましょう。
完成したコードはこちら。
記事では解説しないがテストクラス付き。
構成図は以下。読み進める中で、判らなくなったら見返してください。
動作イメージ知りたければ、先にページ下部の章「動作確認」を読んでください。
前提環境
- Spring Tool Suiteが導入されていること。
- JRE 8以上が導入されていること。
- Dockerが導入されていること。
筆者の環境は以下。
Spring Tool Suite 3
Version: 3.9.9.RELEASE
Build Id: 201906181741
Platform: Eclipse 2019-06 (4.12.0)
$ docker version
Client: Docker Engine - Community
Version: 19.03.2
API version: 1.40
Go version: go1.12.8
(中略)
Server: Docker Engine - Community
Engine:
Version: 19.03.2
API version: 1.40 (minimum version 1.12)
Go version: go1.12.8
(中略)
containerd:
Version: v1.2.6
(中略)
runc:
Version: 1.0.0-rc8
(中略)
docker-init:
Version: 0.18.0
(中略)
$
$ java --version
java 9
Java(TM) SE Runtime Environment (build 9+181)
Java HotSpot(TM) 64-Bit Server VM (build 9+181, mixed mode)
$
メッセージング基盤を構築する
導入が容易であることから、RabbitMQを採用する。
DockerHubからイメージをpullし、runでコンテナを起動する。
RabbitMQコンテナのマニュアルはこちら。
$ docker pull rabbitmq
//起動コマンド
$ docker run -p 15672:15672 -p 5672:5672 rabbitmq:3-management
起動が完了すると、管理コンソールにアクセスできる。URLは以下。
http://localhost:15672/
IDおよびPWは、初期設定ではguest:guestとなる。
Sourceを作る
Spring Cloud Streamではサービス外、例えばREST API等からデータを受け取って
メッセージング基盤へ連携(OUTPUT)するサービスをSourceと呼ぶ。
コード全量はこちら。
まずメッセージング基盤の対象キュー名を、application.propertiesに定義する。
spring.cloud.stream.bindings.output.destination=hello-processor
次に@EnableBindingアノテーションで
POJOなクラスと、メッセージング基盤の対象キューを紐づける。
加えてSourceオブジェクトをDIし
Source.output.send()メソッドを呼び出してメッセージを送信する。
(前略)
@EnableBinding(Source.class)
public class HelloSourceApplication {
private final Source source;
public HelloSourceApplication(Source source) {
this.source=source;
}
(中略)
@PostMapping
public void tweet(@RequestBody Tweet tweet) {
source.output().send(MessageBuilder.withPayload(tweet).build());
}
public static class Tweet {
public String tweet;
}
}
Sinkを作る
Sourceとは逆に、メッセージング基盤から連携(INPUT)を受けて
バックエンドにデータを連携するサービスを
Spring Cloud StreamではSinkと呼ぶ。
コード全量はこちら。
まずメッセージング基盤の対象キュー名を、application.propertiesに定義する。
spring.cloud.stream.bindings.input.destination=hello-sink
次に@EnableBindingアノテーションで
POJOなクラスと、メッセージング基盤の対象キューを紐づける。
また@StreamListenerアノテーションにより、
キューから受け取ったメッセージを直接、DTOと紐づける。
今回は、受け取ったDTOを標準出力に出して終了する。
(前略)
@EnableBinding(Sink.class)
public class HelloSinkApplication {
(中略)
@StreamListener(Sink.INPUT)
public void print(Tweet tweet) {
System.out.println("Received " + tweet.tweet);
}
public static class Tweet {
public String tweet;
}
}
Processorを作る
メッセージング基盤から連携(INPUT)を受けて
メッセージング基盤へ連携(OUTPUT)するサービスをProcessorと呼ぶ。
コード全量はこちら
まずメッセージング基盤の対象キュー名をapplication.propertiesに定義する。
今回の例では、OUTPUTにSinkサービスが購読する対象キュー名を定義している。
またINPUTとして、Sourceがメッセージを送る先の対象キュー名を定義している。
spring.cloud.stream.bindings.output.destination=hello-sink
spring.cloud.stream.bindings.input.destination=hello-processor
次に@EnableBindingアノテーションで
POJOなクラスと、メッセージング基盤の対象キューを紐づける。
更に、@StreamListenerアノテーションを用いて、受け取ったメッセージをDTOに紐づけ。
加えて@SendToアノテーションで
メッセージを送る対象キューおよびメッセージ本文を、メソッド戻り値であるDTOに紐づけ。
結果として、メッセージを受信したら
その値を加工(後ろに" processing!"を付与)して、メッセージを送信する挙動となる。
(前略)
@EnableBinding(Processor.class)
public class HelloProcessorApplication {
(中略)
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Tweet transform(Tweet tweet) {
tweet.tweet += " processing!";
return tweet;
}
public static class Tweet {
public String tweet;
}
}
動作確認
それぞれのプロジェクトについてmaven clean packageを実行。
target配下に生成された各Jarを以下のとおり起動する。
$ java -jar target/hello-source-0.0.1-SNAPSHOT.jar --server.port=8080
$ java -jar target/hello-sink-0.0.1-SNAPSHOT.jar --server.port=8082
$ java -jar target/hello-processor-0.0.1-SNAPSHOT.jar --server.port=8090
curlで、Sourceに対してPOSTリクエストを送る。
$ curl -v localhost:8080 -d '{"tweet":"Hello"}' -H 'Content-Type: application/json'
SinkのJarを起動したコンソールに、メッセージが出力される。
Received Hello processing!
まとめ
SpringBootならびにRabbitMQ、Spring Cloud Streamを用いて
マイクロサービスなメッセージ連携の設計/実装方法を学んだ。
是非それぞれのサービスを沢山、起動して遊んで頂きたい。
参考文献
Stream Processing with RabbitMQ