30
25

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Spring Boot 2.2でRSocketがサポートされたので試してみる

Posted at

こんにちは。Spring Boot 2.2でRSocketがサポートされたので、試してみました。

RSocketとは?

  • マイクロサービス間の通信用に設計されたバイナリプロトコル。
  • Reactive Socket。Reactive Streamsをサポート。
  • TCP、WebSocket、Aeron、そしてHTTP/2 streamsをトランスポートプロトコルとして使用できる。
  • 以下の4つのインタラクションモデルが使用できる。
    • request-response (1リクエスト、1レスポンス)
    • request-stream (1リクエスト、複数レスポンス)
    • fire-and-forget (1リクエスト、Noレスポンス
    • channel (複数リクエスト、複数レスポンス)
  • 開発言語に依存しない。Java、JavaScriot、GO、.NET など
  • Spring 5.2、Spring Boot 2.2から正式サポート。
  • 公式サイト : RSocketの公式サイト

他のプロトコルとの比較

vs REST

  • RSocketは、RESTよりも通信上のオーバーヘッドが少ない。
  • RESTで上記4つのインタラクションを実現したい場合、ポーリングなどの仕組みが必要。

vs gRPC

  • gRPCはブラウザとの通信がサポートされていない。ブラウザと通信したい場合、RESTに変換するプロキシが必要。

詳しくは次のサイトを参照してください。

早速、試してみる

公式サイトなどを見ながらざっくり概要は理解できたので、簡単なアプリケーションを作成します。
RSocketはサービス間の通信プロトコルなので、

  • リクエストを送る側(rsocket-client)
  • リクエストを受けてレスポンスを返す側(rsocket-backend)

の2つが必要です。rsocket-client、rsocket-backendどちらもサーバとして作成し、とりあえず今回はサーバ間の通信を試します。アプリケーションのシーケンス図は以下のとおりです。

スクリーンショット 2020-01-08 22.20.47.png

ブラウザでアクセスして結果を見たかったので、rsocket-clientはHTTPのエンドポイントを持つアプリケーションにしています。
1〜4が "request-response"で、5〜9が"request-stream"のインタラクションです。
9のレスポンスは随時返却される動作にしたかったので、SSE(Server-Sent Events)を使用しました。

プロジェクト構成

rsocket-client、rsocket-backendはそれぞれサブプロジェクトとして作成します。
またリクエスト、レスポンスとして使用するデータモデル(DTO)はそれぞれのプロジェクトで定義したくなかったので、これもサブプロジェクト(rsocket-model)で作成します。

|- build.gradle
|- rsocket-model/
|  |- build.gradle
|  :
|- rsocket-client/
|  |- build.gradle
|  :
└  rsocket-backend/
   |- build.gradle
   :

共通のbuild.gradleはこんな感じ。
spring-boot-starter-rsocketspring-boot-starter-webflux をdependencyに追加します。

build.gradle
buildscript {
	repositories {
		mavenCentral()
		maven { url "https://plugins.gradle.org/m2/" }
	}

	dependencies {
		classpath "org.springframework.boot:spring-boot-gradle-plugin:2.2.2.RELEASE"
		classpath "io.spring.gradle:dependency-management-plugin:1.0.8.RELEASE"
	}
}

allprojects {
	repositories {
		mavenCentral()
	}
}

subprojects {
	group = 'sandbox'
	version = '0.0.1-SNAPSHOT'

	apply plugin: "java"
	apply plugin: "java-library"
	apply plugin: "org.springframework.boot"
	apply plugin: "io.spring.dependency-management"

	sourceCompatibility = JavaVersion.VERSION_11
	targetCompatibility = JavaVersion.VERSION_11

	dependencyManagement {
		dependencies {
			dependency "org.springframework.boot:spring-boot-starter-rsocket:2.2.2.RELEASE"
			dependency "org.springframework.boot:spring-boot-starter-webflux:2.2.2.RELEASE"
			dependency "org.springframework.boot:spring-boot-devtools:2.2.2.RELEASE"
		}
	}
}

rsocket-model

rsocket-modelのbuild.gradle。

build.gradle
project(":rsocket-model") {
    dependencies {
        compileOnly("org.projectlombok:lombok")
        annotationProcessor("org.projectlombok:lombok")
    }
    bootJar {
        enabled = false
    }
    jar {
        enabled = true
    }
}

リクエストのDTOの定義。
Javaオブジェクトのエンコード、デコードにはJacksonが使用されるみたいなので、デフォルトコンストラクタの作成も忘れずに。

RequestData.java
package sandbox.rsocket;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@NoArgsConstructor
@AllArgsConstructor
@Data
public class RequestData {
    String message;
}

レスポンスのDTOの定義。

ResponseData.java
package sandbox.rsocket;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@NoArgsConstructor
@AllArgsConstructor
@Data
public class ResponseData {
    String message;
}

rsocket-bankend

rsocket-bankendのbuild.gradle。

rsocket-bankend/build.gradle
project(":rsocket-backend") {
    dependencies {
        implementation project(':rsocket-model')
        implementation 'org.springframework.boot:spring-boot-starter-rsocket'
        implementation 'org.springframework.boot:spring-boot-starter-webflux'
        implementation 'org.springframework.boot:spring-boot-devtools'

        compileOnly("org.projectlombok:lombok")
        annotationProcessor("org.projectlombok:lombok")
    }
}

application.yml にRSocketで使用するポート番号(7000)を指定します。
トランスポートプロトコルの指定もYAMLから行うことができます。この辺りの定義は、Spring Bootのドキュメントに書かれています。
8.2. RSocket server Auto-configuration

application.yml
spring:
  rsocket:
    server:
      port: 7000
      # Remove commented out if enable RSocket over websocket. (Using tcp as default.)
      # See the link for the details. https://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/html/spring-boot-features.html#boot-features-rsocket-strategies-auto-configuration
      # mapping-path: /rsocket
      # transport: websocket

RSocketサーバのコントローラの定義。
@MessageMappingアノテーションでRSocketのエンドポイント名を設定します。
getMonoは単一データを返却するのでMonogetFluxは複数データ(stream)を返却するのでFluxにレスポンスを格納します。

RSocketServerController.java
package sandbox.rsocket;

import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
@AllArgsConstructor
@Controller
public class RSocketServerController {
    /**
     * Get response data with mono.
     */
    @MessageMapping("getMono")
    public Mono<ResponseData> getMono(RequestData requestData) {
        log.info("Calling getMono method. request={}", requestData);
        return Mono.just(new ResponseData(requestData.getMessage()));
    }

    /**
     * Get response data with flux.
     * Responds one of the response data every seconds.
     */
    @MessageMapping("getFlux")
    public Flux<ResponseData> getFlux(RequestData requestData) {
        log.info("Calling getFlux method. request={}", requestData);
        final List<ResponseData> list =
                IntStream.rangeClosed(1, 10)
                         .boxed()
                         .map(i -> new ResponseData(requestData.getMessage() + '_' + i))
                         .collect(Collectors.toList());
        return Flux.fromIterable(list)
                   .delayElements(Duration.ofSeconds(1));
    }
}

RSocket-client

rsocket-clientのbuild.gradle。

rsocket-client/build.gradle
project(":rsocket-client") {
    dependencies {
        implementation project(':rsocket-model')
        implementation 'org.springframework.boot:spring-boot-starter-rsocket'
        implementation 'org.springframework.boot:spring-boot-starter-webflux'
        implementation 'org.springframework.boot:spring-boot-devtools'

        compileOnly("org.projectlombok:lombok")
        annotationProcessor("org.projectlombok:lombok")
    }
}

application.yml にはHTTPのエンドポイントのポート番号(8081)を指定します。
ちなみに、クライアント側のログレベルをDEBUGにすると、RSocketのフレーム情報をコンソールに出力することができます。

application.yml
server:
  port: 8081
# Remove commented out if u want to see RSocket frame on console log.
# logging:
#  level:
#    root: DEBUG

以下はgetMonoにリクエストしたときのコンソールに出力されたDEBUGログです。それっぽいログがでてますね!

2020-01-08 23:15:00.853 DEBUG 6776 --- [ctor-http-nio-2] o.s.http.codec.cbor.Jackson2CborEncoder  : Encoding [RequestData(message=test)]
2020-01-08 23:15:00.879 DEBUG 6776 --- [actor-tcp-nio-1] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 36
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 08 07 67 65 74 4d 6f 6e 6f             |.....getMono    |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| bf 67 6d 65 73 73 61 67 65 64 74 65 73 74 ff    |.gmessagedtest. |
+--------+-------------------------------------------------+----------------+
2020-01-08 23:15:01.015 DEBUG 6776 --- [actor-tcp-nio-1] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 21
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| bf 67 6d 65 73 73 61 67 65 64 74 65 73 74 ff    |.gmessagedtest. |
+--------+-------------------------------------------------+----------------+
2020-01-08 23:15:01.023 DEBUG 6776 --- [actor-tcp-nio-1] o.s.http.codec.cbor.Jackson2CborDecoder  : Decoded [ResponseData(message=test)]
2020-01-08 23:15:01.023 DEBUG 6776 --- [actor-tcp-nio-1] o.s.http.codec.json.Jackson2JsonEncoder  : [e539e702] Encoding [ResponseData(message=test)]

クライアント用のConfigurationの定義。RSocketRequesterにRSocketサーバのホスト名(localhost)とポート番号(7000)を設定します。Builderクラスが用意されているので、数行で書けます。

ClientConfiguration.java
package sandbox.rsocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.RSocketRequester;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@AllArgsConstructor
@Configuration
public class ClientConfiguration {
    private final RSocketRequester.Builder builder;

    @Bean
    public RSocketRequester rSocketRequester() {
        return builder.connectTcp("localhost", 7000)
                      .doOnNext(socket -> log.info("Connected to RSocket."))
                      .block();
    }
}

RSocketクライアントのサービスクラスの定義。RSocketサーバのエンドポイント名はここで設定します。

RSocketClientService.java
package sandbox.rsocket;

import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Service;

import lombok.AllArgsConstructor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@AllArgsConstructor
@Service
public class RSocketClientService {
    private final RSocketRequester rSocketRequester;

    public Mono<ResponseData> getMono(RequestData data) {
        return rSocketRequester.route("getMono")
                               .data(data)
                               .retrieveMono(ResponseData.class);
    }

    public Flux<ResponseData> getFlux(RequestData data) {
        return rSocketRequester.route("getFlux")
                               .data(data)
                               .retrieveFlux(ResponseData.class);
    }
}

RSocketクライアントのHTTPのエンドポイントの定義。

RSocketClientController.java
package sandbox.rsocket;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import lombok.AllArgsConstructor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@AllArgsConstructor
@RestController
public class RSocketClientController {
    private final RSocketClientService clientService;

    /**
     * Get response mono data.
     */
    @GetMapping("/mono")
    public Mono<ResponseData> mono(@RequestParam String message) {
        return clientService.getMono(new RequestData(message));
    }

    /**
     * Get response flux data with server sent events.
     */
    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/flux")
    public Flux<ResponseData> flux(@RequestParam String message) {
        return clientService.getFlux(new RequestData(message));
    }
}

rsocket-bankend、rsocket-clientをそれぞれbootRunして、HTTPのエンドポイントにアクセスします。
以下は、http://localhost:8081/fluxにアクセスしたときの結果です。1秒ごとにデータがレスポンスされていますね!

Flux_demo.gif

まとめ

  • Spring BootとRSocketを使った簡単なアプリケーションを作成して、サーバ間の通信を試してみました。
  • 非常に少ないコードで簡単に作成することはできましたが、ブラックボックスな部分が多く、実際使うときは詳細な理解が必要だな。と思いました。(小並感)
  • あと、この記事には書いてありませんが、"RSocket over WebSocket"でブラウザとの通信も試してみました。Webアプリ開発者としては、RESTに変わる手段ができてとてもうれしい感があります。(これも小並感)
  • 今回作成したソースコードはGitHub: RSocket sandbox に保存しています。参考にしていただければ幸いです。

参考

30
25
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
30
25

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?