8
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

MDCAdvent Calendar 2019

Day 7

SpringBootではじめるメッセージ連携

Last updated at Posted at 2019-12-06

目次

  1. メッセージ連携とは
  2. 記事の趣旨
  3. 前提環境
  4. メッセージング基盤を構築する
  5. Sourceを作る
  6. Sinkを作る
  7. Processorを作る
  8. 動作確認
  9. まとめ

メッセージ連携とは

メッセージ連携とは、アプリケーションプログラム同士で
データや処理要求など「メッセージ」を交換することで、
複数アプリケーションを非同期に連携させるための仕組み。

マイクロサービス設計においては
メッセージを一時的に保持し、この受け渡しを仲介するミドルウェアを導入することで
各サービスを疎に保ち、スケーラビリティ向上を図る方針が採られている。

みんな大好きSpringFrameworkおよびSpringBootでは、
メッセージ連携を担うサービスの軽量開発を実現する
Spring Cloud Streamプロジェクトが注目されている。

Spring Cloud Stream

記事の趣旨

Spring Cloud Streamを使ってメッセージ連携に触れてみましょう。

完成したコードはこちら
記事では解説しないがテストクラス付き。

構成図は以下。読み進める中で、判らなくなったら見返してください。
image.png

動作イメージ知りたければ、先にページ下部の章「動作確認」を読んでください。

前提環境

  • Spring Tool Suiteが導入されていること。
  • JRE 8以上が導入されていること。
  • Dockerが導入されていること。

筆者の環境は以下。

Spring Tool Suite 3

Version: 3.9.9.RELEASE
Build Id: 201906181741
Platform: Eclipse 2019-06 (4.12.0)
$ docker version
Client: Docker Engine - Community
 Version:           19.03.2
 API version:       1.40
 Go version:        go1.12.8
(中略)
Server: Docker Engine - Community
 Engine:
  Version:          19.03.2
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.12.8
(中略)
 containerd:
  Version:          v1.2.6
(中略)
 runc:
  Version:          1.0.0-rc8
(中略)
 docker-init:
  Version:          0.18.0
(中略)
$ 
$ java --version
java 9
Java(TM) SE Runtime Environment (build 9+181)
Java HotSpot(TM) 64-Bit Server VM (build 9+181, mixed mode)
$ 

メッセージング基盤を構築する

導入が容易であることから、RabbitMQを採用する。

DockerHubからイメージをpullし、runでコンテナを起動する。
RabbitMQコンテナのマニュアルはこちら

$ docker pull rabbitmq
//起動コマンド
$ docker run -p 15672:15672 -p 5672:5672 rabbitmq:3-management

起動が完了すると、管理コンソールにアクセスできる。URLは以下。
http://localhost:15672/

IDおよびPWは、初期設定ではguest:guestとなる。
image.png

Sourceを作る

Spring Cloud Streamではサービス外、例えばREST API等からデータを受け取って
メッセージング基盤へ連携(OUTPUT)するサービスをSourceと呼ぶ。

コード全量はこちら

まずメッセージング基盤の対象キュー名を、application.propertiesに定義する。

spring.cloud.stream.bindings.output.destination=hello-processor

次に@EnableBindingアノテーションで
POJOなクラスと、メッセージング基盤の対象キューを紐づける。

加えてSourceオブジェクトをDIし
Source.output.send()メソッドを呼び出してメッセージを送信する。

前略	
@EnableBinding(Source.class)
public class HelloSourceApplication {
	private final Source source;
	
	public HelloSourceApplication(Source source) {
		this.source=source;
	}
中略	
	@PostMapping
	public void tweet(@RequestBody Tweet tweet) {
		source.output().send(MessageBuilder.withPayload(tweet).build());
	}
	
	public static class Tweet {
		public String tweet;
	}

}

Sinkを作る

Sourceとは逆に、メッセージング基盤から連携(INPUT)を受けて
バックエンドにデータを連携するサービスを
Spring Cloud StreamではSinkと呼ぶ。

コード全量はこちら

まずメッセージング基盤の対象キュー名を、application.propertiesに定義する。

spring.cloud.stream.bindings.input.destination=hello-sink

次に@EnableBindingアノテーションで
POJOなクラスと、メッセージング基盤の対象キューを紐づける。
また@StreamListenerアノテーションにより、
キューから受け取ったメッセージを直接、DTOと紐づける。

今回は、受け取ったDTOを標準出力に出して終了する。

前略	
@EnableBinding(Sink.class)
public class HelloSinkApplication {
中略	
	@StreamListener(Sink.INPUT)
	public void print(Tweet tweet) {
		System.out.println("Received " + tweet.tweet);
	}

	public static class Tweet {
		public String tweet;
	}
}

Processorを作る

メッセージング基盤から連携(INPUT)を受けて
メッセージング基盤へ連携(OUTPUT)するサービスをProcessorと呼ぶ。

コード全量はこちら

まずメッセージング基盤の対象キュー名をapplication.propertiesに定義する。
今回の例では、OUTPUTにSinkサービスが購読する対象キュー名を定義している。
またINPUTとして、Sourceがメッセージを送る先の対象キュー名を定義している。

spring.cloud.stream.bindings.output.destination=hello-sink

spring.cloud.stream.bindings.input.destination=hello-processor

次に@EnableBindingアノテーションで
POJOなクラスと、メッセージング基盤の対象キューを紐づける。

更に、@StreamListenerアノテーションを用いて、受け取ったメッセージをDTOに紐づけ。
加えて@SendToアノテーションで
メッセージを送る対象キューおよびメッセージ本文を、メソッド戻り値であるDTOに紐づけ。

結果として、メッセージを受信したら
その値を加工(後ろに" processing!"を付与)して、メッセージを送信する挙動となる。

前略	
@EnableBinding(Processor.class)
public class HelloProcessorApplication {
中略	
	@StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
	public Tweet transform(Tweet tweet) {
		tweet.tweet += " processing!";
		return tweet;
	}
	
	public static class Tweet {
		public String tweet;
	}
}

動作確認

それぞれのプロジェクトについてmaven clean packageを実行。
target配下に生成された各Jarを以下のとおり起動する。

$ java -jar target/hello-source-0.0.1-SNAPSHOT.jar --server.port=8080
$ java -jar target/hello-sink-0.0.1-SNAPSHOT.jar --server.port=8082
$ java -jar target/hello-processor-0.0.1-SNAPSHOT.jar --server.port=8090

curlで、Sourceに対してPOSTリクエストを送る。

$ curl -v localhost:8080 -d '{"tweet":"Hello"}' -H 'Content-Type: application/json'

SinkのJarを起動したコンソールに、メッセージが出力される。

Received Hello processing!

まとめ

SpringBootならびにRabbitMQ、Spring Cloud Streamを用いて
マイクロサービスなメッセージ連携の設計/実装方法を学んだ。

是非それぞれのサービスを沢山、起動して遊んで頂きたい。

参考文献

Stream Processing with RabbitMQ

Spring Cloud Streamでマイクロサービス間メッセージング

Spring Cloud Stream Tutorial

8
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?