こんにちは。Spring Boot 2.2でRSocketがサポートされたので、試してみました。
RSocketとは?
- マイクロサービス間の通信用に設計されたバイナリプロトコル。
- Reactive Socket。Reactive Streamsをサポート。
- TCP、WebSocket、Aeron、そしてHTTP/2 streamsをトランスポートプロトコルとして使用できる。
- 以下の4つのインタラクションモデルが使用できる。
- request-response (1リクエスト、1レスポンス)
- request-stream (1リクエスト、複数レスポンス)
- fire-and-forget (1リクエスト、Noレスポンス
- channel (複数リクエスト、複数レスポンス)
- 開発言語に依存しない。Java、JavaScriot、GO、.NET など
- Spring 5.2、Spring Boot 2.2から正式サポート。
- 公式サイト : RSocketの公式サイト
他のプロトコルとの比較
vs REST
- RSocketは、RESTよりも通信上のオーバーヘッドが少ない。
- RESTで上記4つのインタラクションを実現したい場合、ポーリングなどの仕組みが必要。
vs gRPC
- gRPCはブラウザとの通信がサポートされていない。ブラウザと通信したい場合、RESTに変換するプロキシが必要。
詳しくは次のサイトを参照してください。
早速、試してみる
公式サイトなどを見ながらざっくり概要は理解できたので、簡単なアプリケーションを作成します。
RSocketはサービス間の通信プロトコルなので、
- リクエストを送る側(rsocket-client)
- リクエストを受けてレスポンスを返す側(rsocket-backend)
の2つが必要です。rsocket-client、rsocket-backendどちらもサーバとして作成し、とりあえず今回はサーバ間の通信を試します。アプリケーションのシーケンス図は以下のとおりです。

ブラウザでアクセスして結果を見たかったので、rsocket-clientはHTTPのエンドポイントを持つアプリケーションにしています。
1〜4が "request-response"で、5〜9が"request-stream"のインタラクションです。
9のレスポンスは随時返却される動作にしたかったので、SSE(Server-Sent Events)を使用しました。
プロジェクト構成
rsocket-client、rsocket-backendはそれぞれサブプロジェクトとして作成します。
またリクエスト、レスポンスとして使用するデータモデル(DTO)はそれぞれのプロジェクトで定義したくなかったので、これもサブプロジェクト(rsocket-model)で作成します。
|- build.gradle
|- rsocket-model/
| |- build.gradle
| :
|- rsocket-client/
| |- build.gradle
| :
└ rsocket-backend/
|- build.gradle
:
共通のbuild.gradleはこんな感じ。
spring-boot-starter-rsocket
と spring-boot-starter-webflux
をdependencyに追加します。
buildscript {
repositories {
mavenCentral()
maven { url "https://plugins.gradle.org/m2/" }
}
dependencies {
classpath "org.springframework.boot:spring-boot-gradle-plugin:2.2.2.RELEASE"
classpath "io.spring.gradle:dependency-management-plugin:1.0.8.RELEASE"
}
}
allprojects {
repositories {
mavenCentral()
}
}
subprojects {
group = 'sandbox'
version = '0.0.1-SNAPSHOT'
apply plugin: "java"
apply plugin: "java-library"
apply plugin: "org.springframework.boot"
apply plugin: "io.spring.dependency-management"
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
dependencyManagement {
dependencies {
dependency "org.springframework.boot:spring-boot-starter-rsocket:2.2.2.RELEASE"
dependency "org.springframework.boot:spring-boot-starter-webflux:2.2.2.RELEASE"
dependency "org.springframework.boot:spring-boot-devtools:2.2.2.RELEASE"
}
}
}
rsocket-model
rsocket-modelのbuild.gradle。
project(":rsocket-model") {
dependencies {
compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
}
bootJar {
enabled = false
}
jar {
enabled = true
}
}
リクエストのDTOの定義。
Javaオブジェクトのエンコード、デコードにはJacksonが使用されるみたいなので、デフォルトコンストラクタの作成も忘れずに。
package sandbox.rsocket;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
public class RequestData {
String message;
}
レスポンスのDTOの定義。
package sandbox.rsocket;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
public class ResponseData {
String message;
}
rsocket-bankend
rsocket-bankendのbuild.gradle。
project(":rsocket-backend") {
dependencies {
implementation project(':rsocket-model')
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-devtools'
compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
}
}
application.yml にRSocketで使用するポート番号(7000)を指定します。
トランスポートプロトコルの指定もYAMLから行うことができます。この辺りの定義は、Spring Bootのドキュメントに書かれています。
8.2. RSocket server Auto-configuration
spring:
rsocket:
server:
port: 7000
# Remove commented out if enable RSocket over websocket. (Using tcp as default.)
# See the link for the details. https://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/html/spring-boot-features.html#boot-features-rsocket-strategies-auto-configuration
# mapping-path: /rsocket
# transport: websocket
RSocketサーバのコントローラの定義。
@MessageMapping
アノテーションでRSocketのエンドポイント名を設定します。
getMono
は単一データを返却するのでMono
、getFlux
は複数データ(stream)を返却するのでFlux
にレスポンスを格納します。
package sandbox.rsocket;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
@AllArgsConstructor
@Controller
public class RSocketServerController {
/**
* Get response data with mono.
*/
@MessageMapping("getMono")
public Mono<ResponseData> getMono(RequestData requestData) {
log.info("Calling getMono method. request={}", requestData);
return Mono.just(new ResponseData(requestData.getMessage()));
}
/**
* Get response data with flux.
* Responds one of the response data every seconds.
*/
@MessageMapping("getFlux")
public Flux<ResponseData> getFlux(RequestData requestData) {
log.info("Calling getFlux method. request={}", requestData);
final List<ResponseData> list =
IntStream.rangeClosed(1, 10)
.boxed()
.map(i -> new ResponseData(requestData.getMessage() + '_' + i))
.collect(Collectors.toList());
return Flux.fromIterable(list)
.delayElements(Duration.ofSeconds(1));
}
}
RSocket-client
rsocket-clientのbuild.gradle。
project(":rsocket-client") {
dependencies {
implementation project(':rsocket-model')
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-devtools'
compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
}
}
application.yml にはHTTPのエンドポイントのポート番号(8081)を指定します。
ちなみに、クライアント側のログレベルをDEBUGにすると、RSocketのフレーム情報をコンソールに出力することができます。
server:
port: 8081
# Remove commented out if u want to see RSocket frame on console log.
# logging:
# level:
# root: DEBUG
以下はgetMono
にリクエストしたときのコンソールに出力されたDEBUGログです。それっぽいログがでてますね!
2020-01-08 23:15:00.853 DEBUG 6776 --- [ctor-http-nio-2] o.s.http.codec.cbor.Jackson2CborEncoder : Encoding [RequestData(message=test)]
2020-01-08 23:15:00.879 DEBUG 6776 --- [actor-tcp-nio-1] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 36
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 08 07 67 65 74 4d 6f 6e 6f |.....getMono |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| bf 67 6d 65 73 73 61 67 65 64 74 65 73 74 ff |.gmessagedtest. |
+--------+-------------------------------------------------+----------------+
2020-01-08 23:15:01.015 DEBUG 6776 --- [actor-tcp-nio-1] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 21
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| bf 67 6d 65 73 73 61 67 65 64 74 65 73 74 ff |.gmessagedtest. |
+--------+-------------------------------------------------+----------------+
2020-01-08 23:15:01.023 DEBUG 6776 --- [actor-tcp-nio-1] o.s.http.codec.cbor.Jackson2CborDecoder : Decoded [ResponseData(message=test)]
2020-01-08 23:15:01.023 DEBUG 6776 --- [actor-tcp-nio-1] o.s.http.codec.json.Jackson2JsonEncoder : [e539e702] Encoding [ResponseData(message=test)]
クライアント用のConfigurationの定義。RSocketRequester
にRSocketサーバのホスト名(localhost)とポート番号(7000)を設定します。Builderクラスが用意されているので、数行で書けます。
package sandbox.rsocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.RSocketRequester;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@AllArgsConstructor
@Configuration
public class ClientConfiguration {
private final RSocketRequester.Builder builder;
@Bean
public RSocketRequester rSocketRequester() {
return builder.connectTcp("localhost", 7000)
.doOnNext(socket -> log.info("Connected to RSocket."))
.block();
}
}
RSocketクライアントのサービスクラスの定義。RSocketサーバのエンドポイント名はここで設定します。
package sandbox.rsocket;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Service;
import lombok.AllArgsConstructor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@AllArgsConstructor
@Service
public class RSocketClientService {
private final RSocketRequester rSocketRequester;
public Mono<ResponseData> getMono(RequestData data) {
return rSocketRequester.route("getMono")
.data(data)
.retrieveMono(ResponseData.class);
}
public Flux<ResponseData> getFlux(RequestData data) {
return rSocketRequester.route("getFlux")
.data(data)
.retrieveFlux(ResponseData.class);
}
}
RSocketクライアントのHTTPのエンドポイントの定義。
package sandbox.rsocket;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import lombok.AllArgsConstructor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@AllArgsConstructor
@RestController
public class RSocketClientController {
private final RSocketClientService clientService;
/**
* Get response mono data.
*/
@GetMapping("/mono")
public Mono<ResponseData> mono(@RequestParam String message) {
return clientService.getMono(new RequestData(message));
}
/**
* Get response flux data with server sent events.
*/
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/flux")
public Flux<ResponseData> flux(@RequestParam String message) {
return clientService.getFlux(new RequestData(message));
}
}
rsocket-bankend、rsocket-clientをそれぞれbootRunして、HTTPのエンドポイントにアクセスします。
以下は、http://localhost:8081/flux
にアクセスしたときの結果です。1秒ごとにデータがレスポンスされていますね!
まとめ
- Spring BootとRSocketを使った簡単なアプリケーションを作成して、サーバ間の通信を試してみました。
- 非常に少ないコードで簡単に作成することはできましたが、ブラックボックスな部分が多く、実際使うときは詳細な理解が必要だな。と思いました。(小並感)
- あと、この記事には書いてありませんが、"RSocket over WebSocket"でブラウザとの通信も試してみました。Webアプリ開発者としては、RESTに変わる手段ができてとてもうれしい感があります。(これも小並感)
- 今回作成したソースコードはGitHub: RSocket sandbox に保存しています。参考にしていただければ幸いです。