LoginSignup
1
0

More than 1 year has passed since last update.

Spring WebFlux WEBサービスで Server-Sent Events を使用する

Last updated at Posted at 2023-02-21

Spring WebFlux WEBサービスで Server-Sent Events を使用する

目的

Spring WebFlux で SSE 対応のWEBサービスを作成して理解を深めます。

実現すること

ローカル環境 (Ubuntu) で Spring WebFlux アプリをビルド・起動させます。
そのWEBサービスに curl コマンドからアクセスして Server-Sent Events の動作を確認します。

技術背景

Spring WebFlux とは?

こちらを展開してご覧いただけます。

Spring WebFlux

非同期・反応的な Webアプリケーションを開発するための Spring Framework のモジュールです。

Spring WebFlux の主な特徴とメリットは以下のとおりです。

非同期・反応的なリクエスト処理

Spring WebFlux は、リアクティブストリームと呼ばれる仕組みを使用して、リクエストを非同期で処理することができます。これにより、少ないリソースで高いスループットを実現することができます。

WebFlux サーバー

Spring WebFlux には、Netty をベースにした WebFlux サーバーが含まれています。このサーバーは、高いスループットと低いレイテンシーを実現することができます。

Reactive Streams API のサポート

Spring WebFlux は、Reactive Streams API と呼ばれる仕様に準拠しており、他の Reactive Streams API を使用するライブラリと統合することができます。これにより、より柔軟で拡張性の高いアプリケーションを開発することができます。

Server-Sent Events (SSE) とは?

こちらを展開してご覧いただけます。

Server-Sent Events

Server-Sent Events (SSE) は、サーバーからの単方向のリアルタイム通信に特化した技術であり、サーバーがクライアントに対してリアルタイムに更新情報をプッシュすることができます。 長時間接続を維持することで、クライアントがサーバーからの更新情報をリアルタイムで受け取ることができます。

SSE は、リアルタイムの情報配信が必要なウェブアプリケーションに使用されます。例えば、以下のような状況で使用されることがあります。

ソーシャルメディアのリアルタイム更新

ソーシャルメディアのようなウェブアプリケーションでは、ユーザーが投稿した新しい情報や、フォローしている人々の活動情報などをリアルタイムに更新する必要があります。SSE を使用することで、サーバーが新しい情報を受信するたびに、クライアントにその情報をプッシュすることができます。

ストック情報のリアルタイム更新

株式市場のような業界では、株価の変動などのリアルタイム情報が必要です。SSE を使用することで、株価や取引の更新情報をリアルタイムに配信することができます。

オンラインゲームのリアルタイム更新

オンラインゲームでは、プレイヤーのアクションや他のプレイヤーの行動など、リアルタイムの情報配信が必要です。SSE を使用することで、プレイヤーにリアルタイムな情報を配信することができます。

これらは一例であり、リアルタイムな情報配信が必要なさまざまな種類のウェブアプリケーションで SSE が使用されることがあります。

開発環境

  • Windows 11 Home 22H2 を使用しています。
  • WSL の Ubuntu を操作していきますので macOS の方も参考にして頂けます。

WSL (Microsoft Store アプリ版)

> wsl --version
WSL バージョン: 1.0.3.0
カーネル バージョン: 5.15.79.1
WSLg バージョン: 1.0.47

Ubuntu

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 22.04.1 LTS
Release:        22.04

Java JDK ※ 最小構成 Java JDK の導入と Hello World!

$ java -version
openjdk version "11.0.17" 2022-10-18
OpenJDK Runtime Environment (build 11.0.17+8-post-Ubuntu-1ubuntu222.04)
OpenJDK 64-Bit Server VM (build 11.0.17+8-post-Ubuntu-1ubuntu222.04, mixed mode, sharing)

Maven ※ 最小構成 Maven の導入と Hello World!

$ mvn -version
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.17, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64

※ この記事では基本的に Ubuntu のターミナルで操作を行います。

[同期処理] Spring MVC WEBサービスの作成

※ Spring Boot で作成します。
同期処理と非同期処理の比較の為に作成するので必要ない方は読み飛ばして下さい🙇‍♂️

こちらを展開してご覧いただけます。

プロジェクトフォルダの作成

※ ~/tmp/sync-spring-mvc をプロジェクトフォルダとします。

$ cd ~
$ mkdir -p tmp/sync-spring-mvc
$ cd ~/tmp/sync-spring-mvc

アプリケーションクラスの作成

※ 構成を単純にする為に全ての要素を記述しています。

$ mkdir -p src/main/java/com/example/springmvc
$ vim src/main/java/com/example/springmvc/SpringbootApplication.java

ファイルの内容

SpringbootApplication.java
package com.example.springmvc;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@SpringBootApplication
public class SpringbootApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootApplication.class, args);
    }

    @GetMapping("/one")
    public Map<String, String> getOne() throws InterruptedException {
        Map<String, String> map = Map.of("message", "Hello Object!");
        TimeUnit.SECONDS.sleep(2);
        log.info("Sending message: {}", map);
        return map;
    }

    @GetMapping("/list")
    public List<Map<String, String>> getList() throws InterruptedException {
        List<Map<String, String>> list = List.of(
            Map.of("message", "Hello List 1!"),
            Map.of("message", "Hello List 2!"),
            Map.of("message", "Hello List 3!"),
            Map.of("message", "Hello List 4!"),
            Map.of("message", "Hello List 5!"));
        list.forEach(map -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                log.error(e.getMessage());
                throw new RuntimeException(e);
            }
            log.info("Sending message: {}", map);
        });
        return list;
    }
}

※ 処理にそれぞれ2秒のウエイト設定を持たせています。

pom.xml の作成

$ vim pom.xml

ファイルの内容

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.8</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>sync-spring-mvc</artifactId>
    <version>1.0</version>
    <name>sync-spring-mvc</name>

    <properties>
        <java.version>11</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
            <scope>provided</scope>
        </dependency>
        <!-- Log -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.6</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.6</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>app</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Spring Boot 設定ファイル作成

  • application.properties

ログ設定ファイル作成

  • logback-spring.xml

※ 上記2つの設定ファイルの作成は以前の記事を参考にして頂けます。

Spring MVC アプリを実行

Java アプリビルド

$ mvn clean install

Java アプリ起動 (Ctrl + C で停止)

$ mvn spring-boot:run

Spring MVC アプリを curl コマンドで確認

※ 別のターミナルから確認します。
※ 見やすさの為一部改行を加えています。

オブジェクト(Map) を取得した場合

$ curl -v http://localhost:8080/one
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /one HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200
< Content-Type: application/json
< Transfer-Encoding: chunked
< Date: Tue, 21 Feb 2023 10:52:42 GMT
<
* Connection #0 to host localhost left intact
{"message":"Hello Object!"}

ログファイル

2023-02-21 19:52:30.673 [INFO ] [main] org.springframework.boot.web.embedded.tomcat.TomcatWebServer.start:220 - Tomcat started on port(s): 8080 (http) with context path ''
2023-02-21 19:52:30.682 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.248 seconds (JVM running for 1.459)
2023-02-21 19:52:40.319 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:525 - Initializing Servlet 'dispatcherServlet'
2023-02-21 19:52:40.321 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:547 - Completed initialization in 1 ms
2023-02-21 19:52:42.338 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.getOne:26 - Sending message: {message=Hello Object!}

※ http-nio-8080-exec-1 というスレッド名から Spring MVC の Tomcat のスレッドで処理していると推測出来ます。

リストを取得した場合

$ curl -v http://localhost:8080/list
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /list HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200
< Content-Type: application/json
< Transfer-Encoding: chunked
< Date: Tue, 21 Feb 2023 10:41:12 GMT
<
* Connection #0 to host localhost left intact
[{"message":"Hello List 1!"},{"message":"Hello List 2!"},{"message":"Hello List 3!"},{"message":"Hello List 4!"},{"message":"Hello List 5!"}]

ログファイル

2023-02-21 19:40:50.087 [INFO ] [main] org.springframework.boot.web.embedded.tomcat.TomcatWebServer.start:220 - Tomcat started on port(s): 8080 (http) with context path ''
2023-02-21 19:40:50.095 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.194 seconds (JVM running for 1.405)
2023-02-21 19:41:02.171 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:525 - Initializing Servlet 'dispatcherServlet'
2023-02-21 19:41:02.174 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:547 - Completed initialization in 1 ms
2023-02-21 19:41:04.192 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 1!}
2023-02-21 19:41:06.196 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 2!}
2023-02-21 19:41:08.197 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 3!}
2023-02-21 19:41:10.198 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 4!}
2023-02-21 19:41:12.200 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 5!}

※ http-nio-8080-exec-1 というスレッド名から Spring MVC の Tomcat のスレッドで処理していると推測出来ます。
※ クライアント(curl)側にはデータは時間が経ってから一度に取得されます。

[非同期処理] Spring WebFlux WEBサービスの作成

※ Spring Boot で作成します。

プロジェクトフォルダの作成

※ ~/tmp/async-spring-webflux をプロジェクトフォルダとします。

$ cd ~
$ mkdir -p tmp/async-spring-webflux
$ cd ~/tmp/async-spring-webflux

アプリケーションクラスの作成

※ 構成を単純にする為に全ての要素を記述しています。

$ mkdir -p src/main/java/com/example/springwebflux
$ vim src/main/java/com/example/springwebflux/SpringbootApplication.java

ファイルの内容

SpringbootApplication.java
package com.example.springwebflux;

import java.time.Duration;
import java.util.Map;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@CrossOrigin(origins = "*", allowedHeaders = "*")
@RestController
@SpringBootApplication
public class SpringbootApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootApplication.class, args);
    }

    @GetMapping("/mono")
    public Mono<Map<String, String>> getMono() {
        return Mono.just(
            Map.of("message", "Hello Mono!"))
            .delayElement(Duration.ofSeconds(2))
            .doOnNext(map -> log.info("Sending message: {}", map));
    }

    @GetMapping("/flux")
    public Flux<Map<String, String>> getFlux() {
        return Flux.range(1, 5)
            .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
            .delayElements(Duration.ofSeconds(2))
            .doOnNext(map -> log.info("Sending message: {}", map));
    }

    @GetMapping("/flux-sse")
    public Flux<ServerSentEvent<Map<String, String>>> getFluxWithSSE() {
        return Flux.range(1, 5)
            .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
            .delayElements(Duration.ofSeconds(2))
            .doOnNext(map -> log.info("Sending message: {}", map))
            .map(map -> ServerSentEvent.builder(map).build())
            .concatWith(Mono.just(ServerSentEvent.<Map<String, String>>builder().event("end").build()));
    }
}

※ 処理にそれぞれ2秒のウエイト設定を持たせています。
※ ポイント: Spring WebFlux で SSE のサーバー側実装をする場合、受け側のクライアントにより明示的に "event: end" を送信する必要がありました。

説明

クライアントに非同期の単一値応答を返すエンドポイント

@GetMapping("/mono")
public Mono<Map<String, String>> getMono() {
    return Mono.just(
        Map.of("message", "Hello Mono!"))
        .delayElement(Duration.ofSeconds(2))
        .doOnNext(map -> log.info("Sending message: {}", map));
}
  • @GetMapping("/mono") アノテーションは HTTP GETリクエストを受信し "/mono" エンドポイントにマッピングします。

  • Mono<Map<String, String>> は、単一の非同期値を表す Reactive Streams の型です。この場合、単一の Map オブジェクトを表しています。

  • Mono.just(Map.of("message", "Hello Mono!")) は、"message" キーとその値を持つ Map オブジェクトを作成しそれを Mono にラップして返します。

  • .delayElement(Duration.ofSeconds(2)) は、応答を2秒間遅延させるために Mono を変換する演算子です。

  • .doOnNext(map -> log.info("Sending message: {}", map)) は、値が通知されるたびに log オブジェクトにメッセージを出力するための副作用を追加するための演算子です。

  • つまり、このエンドポイントは、2秒間待ってから、"Hello Mono!" メッセージを含む Map オブジェクトを含む単一値を返します。また、ログには、値が通知されたときに "Sending message: {message=Hello Mono!}" というメッセージが出力されます。

クライアントに非同期の複数値応答を返すエンドポイント

@GetMapping("/flux")
public Flux<Map<String, String>> getFlux() {
    return Flux.range(1, 5)
        .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
        .delayElements(Duration.ofSeconds(2))
        .doOnNext(map -> log.info("Sending message: {}", map));
}
  • @GetMapping("/flux") アノテーションは HTTP GETリクエストを受信し "/flux" エンドポイントにマッピングします。

  • Flux<Map<String, String>> は、複数の非同期値を表す Reactive Streams の型です。この場合複数の Map オブジェクトを表しています。

  • Flux.range(1, 5) は、1から5までの整数の Flux を生成します。

  • .map(idx -> Map.of("message", "Hello Flux " + idx + "!")) は、各要素に対して "message" キーとその値を持つ Map オブジェクトを作成するための演算子です。

  • .delayElements(Duration.ofSeconds(2)) は、各要素を2秒間遅延させるための演算子です。

  • .doOnNext(map -> log.info("Sending message: {}", map)) は、各要素が通知されるたびに log オブジェクトにメッセージを出力するための副作用を追加するための演算子です。

  • つまり、このエンドポイントは、各要素が2秒間隔で通知され "Hello Flux {index}!" メッセージを含む Map オブジェクトが含まれる複数の値を返します。また、ログには各値が通知されたときに "Sending message: {message=Hello Flux {index}!}" というメッセージが出力されます。

クライアントに Server-Sent Events(SSE)プロトコルを使用して非同期でデータを送信するエンドポイント

@GetMapping("/flux-sse")
public Flux<ServerSentEvent<Map<String, String>>> getFluxWithSSE() {
    return Flux.range(1, 5)
        .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
        .delayElements(Duration.ofSeconds(2))
        .doOnNext(map -> log.info("Sending message: {}", map))
        .map(map -> ServerSentEvent.builder(map).build())
        .concatWith(Mono.just(ServerSentEvent.<Map<String, String>>builder().event("end").build()));
}
  • @GetMapping("/flux-sse") アノテーションは HTTP GETリクエストを受信し "/flux-sse" エンドポイントにマッピングします。

  • Flux<ServerSentEvent<Map<String, String>>> は Server-Sent Events を使用して複数の非同期値を表す Reactive Streams の型です。この場合、複数の Map オブジェクトを表しています。

  • Flux.range(1, 5) は、1から5までの整数の Flux を生成します。

  • .map(idx -> Map.of("message", "Hello Flux " + idx + "!")) は、各要素に対して "message" キーとその値を持つ Map オブジェクトを作成するための演算子です。

  • .delayElements(Duration.ofSeconds(2)) は各要素を2秒間遅延させるための演算子です。

  • .doOnNext(map -> log.info("Sending message: {}", map)) は各要素が通知されるたびに log オブジェクトにメッセージを出力するための副作用を追加するための演算子です。

  • .map(map -> ServerSentEvent.builder(map).build()) は各 Map オブジェクトを ServerSentEvent オブジェクトに変換するための演算子です。

  • .concatWith(Mono.just(ServerSentEvent.<Map<String, String>>builder().event("end").build())) はストリームの最後に ServerSentEvent オブジェクトを追加するための演算子です。これにより SSEストリームの終わりを示す event: end タグが送信されます。

  • つまり、このエンドポイントは、各要素が2秒間隔で通知され "Hello Flux {index}!" メッセージを含む Map オブジェクトが含まれる複数の値を Server-Sent Events 形式で返します。また、ログには各値が通知されたときに "Sending message: {message=Hello Flux {index}!}" というメッセージが出力されます。

pom.xml の作成

$ vim pom.xml
ファイルの内容
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.8</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>sync-spring-webflux</artifactId>
    <version>1.0</version>
    <name>sync-spring-webflux</name>

    <properties>
        <java.version>11</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
            <scope>provided</scope>
        </dependency>
        <!-- Log -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.6</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.6</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>app</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Spring Boot 設定ファイル作成

  • application.properties

ログ設定ファイル作成

  • logback-spring.xml

※ 上記2つの設定ファイルの作成は以前の記事を参考にして頂けます。

Spring WebFlux アプリを実行

Java アプリビルド

$ mvn clean install

Java アプリ起動 (Ctrl + C で停止)

$ mvn spring-boot:run

Spring WebFlux アプリを curl コマンドで確認

※ 別のターミナルから確認します。
※ 見やすさの為一部改行を加えています。

Mono を普通に HTTP でリクエストした場合

$ curl -v http://localhost:8080/mono
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /mono HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Vary: Origin
< Vary: Access-Control-Request-Method
< Vary: Access-Control-Request-Headers
< Content-Type: application/json
< Content-Length: 25
<
* Connection #0 to host localhost left intact
{"message":"Hello Mono!"}

ログファイル

2023-02-21 19:09:36.821 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
2023-02-21 19:09:36.829 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.095 seconds (JVM running for 1.321)
2023-02-21 19:09:44.125 [INFO ] [parallel-1] com.example.springwebflux.SpringbootApplication.lambda$0:46 - Sending message: {message=Hello Mono!}

※ parallel-1 というスレッド名から Spring WebFlux は並列処理のスレッドで Mono を処理していると推測出来ます。

Flux を普通の HTTP でリクエストした場合

$ curl -v http://localhost:8080/flux
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /flux HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Vary: Origin
< Vary: Access-Control-Request-Method
< Vary: Access-Control-Request-Headers
< Content-Type: application/json
<
* Connection #0 to host localhost left intact
[{"message":"Hello Flux 1!"},{"message":"Hello Flux 2!"},{"message":"Hello Flux 3!"},{"message":"Hello Flux 4!"},{"message":"Hello Flux 5!"}]

ログファイル

2023-02-21 18:57:26.755 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
2023-02-21 18:57:26.763 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.092 seconds (JVM running for 1.314)
2023-02-21 18:57:39.522 [INFO ] [parallel-1] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 1!}
2023-02-21 18:57:41.527 [INFO ] [parallel-2] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 2!}
2023-02-21 18:57:43.529 [INFO ] [parallel-3] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 3!}
2023-02-21 18:57:45.531 [INFO ] [parallel-4] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 4!}
2023-02-21 18:57:47.533 [INFO ] [parallel-5] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 5!}

※ parallel-n というスレッド名から Spring WebFlux は2秒ごとに並列処理のスレッドで Flux を処理していると推測出来ます。
※ クライアント(curl)側にはデータは時間が経ってから一度に取得されます。

Flux を Server-Sent Events 指定の HTTP でリクエストした場合

※ "Accept: text/event-stream" ヘッダーを追加します。

$ curl -v http://localhost:8080/flux-sse -H 'Accept: text/event-stream'
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /flux-sse HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: text/event-stream
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Vary: Origin
< Vary: Access-Control-Request-Method
< Vary: Access-Control-Request-Headers
< Content-Type: text/event-stream;charset=UTF-8
<
data:{"message":"Hello Flux 1!"}

data:{"message":"Hello Flux 2!"}

data:{"message":"Hello Flux 3!"}

data:{"message":"Hello Flux 4!"}

data:{"message":"Hello Flux 5!"}

event:end

* Connection #0 to host localhost left intact

ログファイル

2023-02-21 19:25:11.977 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
2023-02-21 19:25:11.985 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.092 seconds (JVM running for 1.313)
2023-02-21 19:25:18.690 [INFO ] [parallel-1] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 1!}
2023-02-21 19:25:20.708 [INFO ] [parallel-2] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 2!}
2023-02-21 19:25:22.712 [INFO ] [parallel-3] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 3!}
2023-02-21 19:25:24.717 [INFO ] [parallel-4] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 4!}
2023-02-21 19:25:26.722 [INFO ] [parallel-5] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 5!}

※ Spring WebFlux は2秒ごとに並列処理のスレッドで Flux を処理しています。
※ クライアント(curl)側にはデータは一つづつリアルタイムで取得されます。

まとめ

  • Spring WebFlux で Server-Sent Events を実装する初歩的なWEBサービスを実装することが出来ました。
  • 今後この Server-Sent Events を使用するクライアント側のアプリを実装して理解を深ようと思いました。

参考

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0