LoginSignup
1
0

Spring WebFlux Webサービスで Server-Sent Events で Hello World する

Last updated at Posted at 2023-03-20

Spring WebFlux Webサービスで Server-Sent Events で Hello World する

こんにちは、@studio_meowtoon です。今回は、WSL の Ubuntu 22.04 で、Server-Sent Events を実装する Spring WebFlux Web サービスを作成して非同期動作を確認する方法を紹介します。
spring-boot_on_ubuntu.png

目的

Windows 11 の Linux でクラウド開発します。

こちらから記事の一覧がご覧いただけます。

実現すること

Ubuntu で Spring WebFlux アプリケーションの Server-Sent Events 動作を確認します。

技術トピックス

Spring WebFlux とは?

こちらを展開してご覧いただけます。

Spring WebFlux

非同期・反応的な Webアプリケーションを開発するための Spring Framework のモジュールです。

キーワード 内容
非同期・反応的なリクエスト処理 Spring WebFlux は、リアクティブストリームと呼ばれる仕組みを使用して、リクエストを非同期で処理することができます。これにより、少ないリソースで高いスループットを実現することができます。
WebFlux サーバー Spring WebFlux には、Netty をベースにした WebFlux サーバーが含まれています。このサーバーは、高いスループットと低いレイテンシーを実現することができます。
Reactive Streams API のサポート Spring WebFlux は、Reactive Streams API と呼ばれる仕様に準拠しており、他の Reactive Streams API を使用するライブラリと統合することができます。これにより、より柔軟で拡張性の高いアプリケーションを開発することができます。

Server-Sent Events (SSE) とは?

こちらを展開してご覧いただけます。

Server-Sent Events

Server-Sent Events (SSE) は、サーバーからの単方向のリアルタイム通信に特化した技術であり、サーバーがクライアントに対してリアルタイムに更新情報をプッシュすることができます。 長時間接続を維持することで、クライアントがサーバーからの更新情報をリアルタイムで受け取ることができます。

SSE は、リアルタイムの情報配信が必要なウェブアプリケーションに使用されます。例えば、以下のような状況で使用されることがあります。

キーワード 内容
ソーシャルメディアのリアルタイム更新 ソーシャルメディアのようなウェブアプリケーションでは、ユーザーが投稿した新しい情報や、フォローしている人々の活動情報などをリアルタイムに更新する必要があります。SSE を使用することで、サーバーが新しい情報を受信するたびに、クライアントにその情報をプッシュすることができます。
ストック情報のリアルタイム更新 株式市場のような業界では、株価の変動などのリアルタイム情報が必要です。SSE を使用することで、株価や取引の更新情報をリアルタイムに配信することができます。
オンラインゲームのリアルタイム更新 オンラインゲームでは、プレイヤーのアクションや他のプレイヤーの行動など、リアルタイムの情報配信が必要です。SSE を使用することで、プレイヤーにリアルタイムな情報を配信することができます。

これらは一例であり、リアルタイムな情報配信が必要なさまざまな種類のウェブアプリケーションで SSE が使用されることがあります。

開発環境

  • Windows 11 Home 22H2 を使用しています。

WSL の Ubuntu を操作していきますので macOS の方も参考にして頂けます。

WSL (Microsoft Store アプリ版) ※ こちらの関連記事からインストール方法をご確認いただけます

> wsl --version
WSL バージョン: 1.0.3.0
カーネル バージョン: 5.15.79.1
WSLg バージョン: 1.0.47

Ubuntu ※ こちらの関連記事からインストール方法をご確認いただけます

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 22.04.1 LTS
Release:        22.04

Java JDK ※ こちらの関連記事からインストール方法をご確認いただけます

$ java -version
openjdk version "17.0.6" 2023-01-17
OpenJDK Runtime Environment GraalVM CE 22.3.1 (build 17.0.6+10-jvmci-22.3-b13)
OpenJDK 64-Bit Server VM GraalVM CE 22.3.1 (build 17.0.6+10-jvmci-22.3-b13, mixed mode, sharing)

Maven ※ こちらの関連記事からインストール方法をご確認いただけます

$ mvn -version
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.18, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64

この記事では基本的に Ubuntu のターミナルで操作を行います。Vim を使用してコピペする方法を初めて学ぶ人のために、以下の記事で手順を紹介しています。ぜひ挑戦してみてください。

作成する Web アプリケーションの仕様

No フレームワーク エンドポイント HTTPメソッド MIME タイプ
1 Spring MVC /one GET application/json
2 Spring MVC /list GET application/json
3 Spring WebFlux /mono GET application/json
4 Spring WebFlux /flux GET application/json
5 Spring WebFlux /flux-sse GET text/event-stream

同期処理

Spring MVC Web サービスの作成

Spring Boot で作成します。 同期処理非同期処理比較の為に作成するので特に必要ない方は読み飛ばして下さい。

こちらを展開してご覧いただけます。

プロジェクトフォルダの作成

プロジェクトフォルダを作成します。
※ ~/tmp/sync-spring-mvc をプロジェクトフォルダとします。

$ mkdir -p ~/tmp/sync-spring-mvc
$ cd ~/tmp/sync-spring-mvc

アプリケーションクラスの作成

アプリケーションクラスを作成します。

プロジェクト構成を単純にするために全ての要素を Application クラスに記述しています。

$ mkdir -p src/main/java/com/example/springmvc
$ vim src/main/java/com/example/springmvc/Application.java

ファイルの内容

Application.java
package com.example.springmvc;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @GetMapping("/one")
    public Map<String, String> getOne() throws InterruptedException {
        Map<String, String> map = Map.of("message", "Hello Object!");
        TimeUnit.SECONDS.sleep(2);
        log.info("Sending message: {}", map);
        return map;
    }

    @GetMapping("/list")
    public List<Map<String, String>> getList() throws InterruptedException {
        List<Map<String, String>> list = List.of(
            Map.of("message", "Hello List 1!"),
            Map.of("message", "Hello List 2!"),
            Map.of("message", "Hello List 3!"),
            Map.of("message", "Hello List 4!"),
            Map.of("message", "Hello List 5!"));
        list.forEach(map -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                log.error(e.getMessage());
                throw new RuntimeException(e);
            }
            log.info("Sending message: {}", map);
        });
        return list;
    }
}

処理にそれぞれ2秒のウエイト設定を持たせています。

pom.xml の作成

pom.xml ファイルを作成します。

$ vim pom.xml

ファイルの内容

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.8</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>sync-spring-mvc</artifactId>
    <version>1.0</version>
    <name>sync-spring-mvc</name>

    <properties>
        <java.version>11</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <finalName>app</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

ログファイルを出力する設定

  • application.properties
  • logback-spring.xml

こちらの関連記事で手順がご確認いただけます。

https://qiita.com/studio_meowtoon/items/ffbcb6ac9179852b2816

アプリのビルド

Java アプリをビルドします。
※ target/app.jar が作成されます。

$ mvn clean install

アプリの起動

アプリを起動します。
※ アプリを停止するときは ctrl + C を押します。

$ rm -rf log
$ mvn spring-boot:run

アプリの動作確認

別ターミナルから curl コマンドで確認します。

オブジェクト(Map) を取得した場合

$ curl -v http://localhost:8080/one -w '\n'
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /one HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200
< Content-Type: application/json
< Transfer-Encoding: chunked
< Date: Mon, 20 Mar 2023 03:53:45 GMT
<
* Connection #0 to host localhost left intact
{"message":"Hello Object!"}

別ターミナルからログファイルを確認します。※ 必要な個所を抜粋しています。

$ cd ~/tmp/sync-spring-mvc/log
$ cat app.log
2023-03-20 13:05:32.679 [INFO ] [main] org.springframework.boot.web.embedded.tomcat.TomcatWebServer.start:220 - Tomcat started on port(s): 8080 (http) with context path ''
2023-03-20 13:05:32.686 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started Application in 0.954 seconds (JVM running for 1.11)
2023-03-20 13:05:45.910 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:525 - Initializing Servlet 'dispatcherServlet'
2023-03-20 13:05:45.913 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:547 - Completed initialization in 1 ms
2023-03-20 13:05:47.930 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.Application.getOne:26 - Sending message: {message=Hello Object!}

http-nio-8080-exec-1 というスレッド名から Spring MVC の Tomcat のスレッドで処理していると推測出来ます。

リストを取得した場合

別ターミナルから curl コマンドで確認します。

$ curl -v http://localhost:8080/list -w '\n'
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /list HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200
< Content-Type: application/json
< Transfer-Encoding: chunked
< Date: Mon, 20 Mar 2023 03:56:19 GMT
<
* Connection #0 to host localhost left intact
[{"message":"Hello List 1!"},{"message":"Hello List 2!"},{"message":"Hello List 3!"},{"message":"Hello List 4!"},{"message":"Hello List 5!"}]

別ターミナルからログファイルを確認します。※ 必要な個所を抜粋しています。

$ cd ~/tmp/sync-spring-mvc/log
$ cat app.log
2023-03-20 13:06:34.424 [INFO ] [main] org.springframework.boot.web.embedded.tomcat.TomcatWebServer.start:220 - Tomcat started on port(s): 8080 (http) with context path ''
2023-03-20 13:06:34.430 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started Application in 0.947 seconds (JVM running for 1.102)
2023-03-20 13:06:38.337 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:525 - Initializing Servlet 'dispatcherServlet'
2023-03-20 13:06:38.340 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:547 - Completed initialization in 1 ms
2023-03-20 13:06:40.522 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.Application.lambda$getList$0:45 - Sending message: {message=Hello List 1!}
2023-03-20 13:06:42.691 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.Application.lambda$getList$0:45 - Sending message: {message=Hello List 2!}
2023-03-20 13:06:44.721 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.Application.lambda$getList$0:45 - Sending message: {message=Hello List 3!}
2023-03-20 13:06:46.740 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.Application.lambda$getList$0:45 - Sending message: {message=Hello List 4!}
2023-03-20 13:06:48.748 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.Application.lambda$getList$0:45 - Sending message: {message=Hello List 5!}

http-nio-8080-exec-1 というスレッド名から Spring MVC の Tomcat のスレッドで処理していると推測出来ます。クライアント (curl) 側にはデータは時間が経ってから一度に取得されます。

ここまでのまとめ

Spring MVC では、組み込みの Web サーバーとして Tomcat が使用されています。 List オブジェクトは同期で処理されています。

非同期処理

Spring WebFlux Web サービスの作成

Spring Boot で作成します。

プロジェクトフォルダの作成

プロジェクトフォルダを作成します。
※ ~/tmp/async-spring-webflux をプロジェクトフォルダとします。

$ mkdir -p ~/tmp/async-spring-webflux
$ cd ~/tmp/async-spring-webflux

アプリケーションクラスの作成

アプリケーションクラスを作成します。

プロジェクト構成を単純にするために全ての要素を Application クラスに記述しています。

$ mkdir -p src/main/java/com/example/springwebflux
$ vim src/main/java/com/example/springwebflux/Application.java

ファイルの内容

Application.java
package com.example.springwebflux;

import java.time.Duration;
import java.util.Map;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@CrossOrigin(origins = "*", allowedHeaders = "*")
@RestController
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @GetMapping("/mono")
    public Mono<Map<String, String>> getMono() {
        return Mono.just(
            Map.of("message", "Hello Mono!"))
            .delayElement(Duration.ofSeconds(2))
            .doOnNext(map -> log.info("Sending message: {}", map));
    }

    @GetMapping("/flux")
    public Flux<Map<String, String>> getFlux() {
        return Flux.range(1, 5)
            .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
            .delayElements(Duration.ofSeconds(2))
            .doOnNext(map -> log.info("Sending message: {}", map));
    }

    @GetMapping("/flux-sse")
    public Flux<ServerSentEvent<Map<String, String>>> getFluxWithSSE() {
        return Flux.range(1, 5)
            .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
            .delayElements(Duration.ofSeconds(2))
            .doOnNext(map -> log.info("Sending message: {}", map))
            .map(map -> ServerSentEvent.builder(map).build())
            .concatWith(Mono.just(ServerSentEvent.<Map<String, String>>builder().event("end").build()));
    }
}

処理にそれぞれ2秒のウエイト設定を持たせています。

Spring WebFluxSSEサーバー側実装をする場合、受け側のクライアントにより明示的に event: end を送信する必要がありました。

説明を開きます。

クライアントに非同期の単一値応答を返すエンドポイント

@GetMapping("/mono")
public Mono<Map<String, String>> getMono() {
    return Mono.just(
        Map.of("message", "Hello Mono!"))
        .delayElement(Duration.ofSeconds(2))
        .doOnNext(map -> log.info("Sending message: {}", map));
}
内容
@GetMapping("/mono") アノテーションは HTTP GETリクエストを受信し "/mono" エンドポイントにマッピングします。
Mono<Map<String, String>> は、単一の非同期値を表す Reactive Streams の型です。この場合、単一の Map オブジェクトを表しています。
Mono.just(Map.of("message", "Hello Mono!")) は、"message" キーとその値を持つ Map オブジェクトを作成しそれを Mono にラップして返します。
.delayElement(Duration.ofSeconds(2)) は、応答を2秒間遅延させるために Mono を変換する演算子です。
.doOnNext(map -> log.info("Sending message: {}", map)) は、値が通知されるたびに log オブジェクトにメッセージを出力するための副作用を追加するための演算子です。
つまり、このエンドポイントは、2秒間待ってから、"Hello Mono!" メッセージを含む Map オブジェクトを含む単一値を返します。また、ログには、値が通知されたときに "Sending message: {message=Hello Mono!}" というメッセージが出力されます。

クライアントに非同期の複数値応答を返すエンドポイント

@GetMapping("/flux")
public Flux<Map<String, String>> getFlux() {
    return Flux.range(1, 5)
        .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
        .delayElements(Duration.ofSeconds(2))
        .doOnNext(map -> log.info("Sending message: {}", map));
}
内容
@GetMapping("/flux") アノテーションは HTTP GETリクエストを受信し "/flux" エンドポイントにマッピングします。
Flux<Map<String, String>> は、複数の非同期値を表す Reactive Streams の型です。この場合複数の Map オブジェクトを表しています。
Flux.range(1, 5) は、1から5までの整数の Flux を生成します。
.map(idx -> Map.of("message", "Hello Flux " + idx + "!")) は、各要素に対して "message" キーとその値を持つ Map オブジェクトを作成するための演算子です。
.delayElements(Duration.ofSeconds(2)) は、各要素を2秒間遅延させるための演算子です。
.doOnNext(map -> log.info("Sending message: {}", map)) は、各要素が通知されるたびに log オブジェクトにメッセージを出力するための副作用を追加するための演算子です。
つまり、このエンドポイントは、各要素が2秒間隔で通知され "Hello Flux {index}!" メッセージを含む Map オブジェクトが含まれる複数の値を返します。また、ログには各値が通知されたときに "Sending message: {message=Hello Flux {index}!}" というメッセージが出力されます。

クライアントに Server-Sent Events(SSE)プロトコルを使用して非同期でデータを送信するエンドポイント

@GetMapping("/flux-sse")
public Flux<ServerSentEvent<Map<String, String>>> getFluxWithSSE() {
    return Flux.range(1, 5)
        .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
        .delayElements(Duration.ofSeconds(2))
        .doOnNext(map -> log.info("Sending message: {}", map))
        .map(map -> ServerSentEvent.builder(map).build())
        .concatWith(Mono.just(ServerSentEvent.<Map<String, String>>builder().event("end").build()));
}
内容
@GetMapping("/flux-sse") アノテーションは HTTP GETリクエストを受信し "/flux-sse" エンドポイントにマッピングします。
Flux<ServerSentEvent<Map<String, String>>> は Server-Sent Events を使用して複数の非同期値を表す Reactive Streams の型です。この場合、複数の Map オブジェクトを表しています。
Flux.range(1, 5) は、1から5までの整数の Flux を生成します。
.map(idx -> Map.of("message", "Hello Flux " + idx + "!")) は、各要素に対して "message" キーとその値を持つ Map オブジェクトを作成するための演算子です。
.delayElements(Duration.ofSeconds(2)) は各要素を2秒間遅延させるための演算子です。
.doOnNext(map -> log.info("Sending message: {}", map)) は各要素が通知されるたびに log オブジェクトにメッセージを出力するための副作用を追加するための演算子です。
.map(map -> ServerSentEvent.builder(map).build()) は各 Map オブジェクトを ServerSentEvent オブジェクトに変換するための演算子です。
.concatWith(Mono.just(ServerSentEvent.<Map<String, String>>builder().event("end").build())) はストリームの最後に ServerSentEvent オブジェクトを追加するための演算子です。これにより SSEストリームの終わりを示す event: end タグが送信されます。
つまり、このエンドポイントは、各要素が2秒間隔で通知され "Hello Flux {index}!" メッセージを含む Map オブジェクトが含まれる複数の値を Server-Sent Events 形式で返します。また、ログには各値が通知されたときに "Sending message: {message=Hello Flux {index}!}" というメッセージが出力されます。

pom.xml の作成

pom.xml ファイルを作成します。

$ vim pom.xml

ファイルの内容

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.8</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>sync-spring-webflux</artifactId>
    <version>1.0</version>
    <name>sync-spring-webflux</name>

    <properties>
        <java.version>11</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <finalName>app</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

ログファイルを出力する設定

  • application.properties
  • logback-spring.xml

こちらの関連記事で手順がご確認いただけます。

アプリのビルド

Java アプリをビルドします。
※ target/app.jar が作成されます。

$ mvn clean package

アプリを起動

アプリを起動します。
※ アプリを停止するときは ctrl + C を押します。

$ rm -rf log
$ mvn spring-boot:run

アプリの動作確認

別ターミナルから curl コマンドで確認します。

Mono を普通に HTTP でリクエストした場合

$ curl -v http://localhost:8080/mono -w '\n'
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /mono HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Vary: Origin
< Vary: Access-Control-Request-Method
< Vary: Access-Control-Request-Headers
< Content-Type: application/json
< Content-Length: 25
<
* Connection #0 to host localhost left intact
{"message":"Hello Mono!"}

別ターミナルからログファイルを確認します。※ 必要な個所を抜粋しています。

$ cd ~/tmp/async-spring-webflux/log
$ cat app.log
2023-03-20 12:11:59.869 [DEBUG] [main] org.springframework.boot.StartupInfoLogger.logStarting:56 - Running with Spring Boot v2.7.8, Spring v5.3.25
2023-03-20 12:11:59.870 [INFO ] [main] org.springframework.boot.SpringApplication.logStartupProfileInfo:637 - The following 1 profile is active: "develop"
2023-03-20 12:12:00.556 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
2023-03-20 12:12:00.562 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started Application in 0.987 seconds (JVM running for 1.221)
2023-03-20 12:15:32.844 [INFO ] [parallel-1] com.example.springwebflux.Application.lambda$getMono$0:32 - Sending message: {message=Hello Mono!}

parallel-1 というスレッド名から Spring WebFlux は並列処理のスレッドで Mono を処理していると推測出来ます。

Flux を普通の HTTP でリクエストした場合

別ターミナルから curl コマンドで確認します。

$ curl -v http://localhost:8080/flux -w '\n'
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /flux HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Vary: Origin
< Vary: Access-Control-Request-Method
< Vary: Access-Control-Request-Headers
< Content-Type: application/json
<
* Connection #0 to host localhost left intact
[{"message":"Hello Flux 1!"},{"message":"Hello Flux 2!"},{"message":"Hello Flux 3!"},{"message":"Hello Flux 4!"},{"message":"Hello Flux 5!"}]

別ターミナルからログファイルを確認します。※ 必要な個所を抜粋しています。

$ cd ~/tmp/async-spring-webflux/log
$ cat app.log
2023-03-20 12:19:28.477 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
2023-03-20 12:19:28.482 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started Application in 0.95 seconds (JVM running for 1.124)
2023-03-20 12:19:42.389 [INFO ] [parallel-1] com.example.springwebflux.Application.lambda$getFlux$2:40 - Sending message: {message=Hello Flux 1!}
2023-03-20 12:19:44.392 [INFO ] [parallel-2] com.example.springwebflux.Application.lambda$getFlux$2:40 - Sending message: {message=Hello Flux 2!}
2023-03-20 12:19:46.394 [INFO ] [parallel-3] com.example.springwebflux.Application.lambda$getFlux$2:40 - Sending message: {message=Hello Flux 3!}
2023-03-20 12:19:48.396 [INFO ] [parallel-4] com.example.springwebflux.Application.lambda$getFlux$2:40 - Sending message: {message=Hello Flux 4!}
2023-03-20 12:19:50.399 [INFO ] [parallel-5] com.example.springwebflux.Application.lambda$getFlux$2:40 - Sending message: {message=Hello Flux 5!}

parallel-n というスレッド名から Spring WebFlux は2秒ごとに並列処理のスレッドで Flux を処理していると推測出来ます。クライアント (curl) 側にはデータは時間が経ってから一度に取得されます。

Flux を Server-Sent Events 指定の HTTP でリクエストした場合

別ターミナルから curl コマンドで確認します。

Accept: text/event-stream ヘッダーを追加します。

$ curl -v http://localhost:8080/flux-sse -H 'Accept: text/event-stream' -w '\n'
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /flux-sse HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: text/event-stream
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Vary: Origin
< Vary: Access-Control-Request-Method
< Vary: Access-Control-Request-Headers
< Content-Type: text/event-stream;charset=UTF-8
<
data:{"message":"Hello Flux 1!"}

data:{"message":"Hello Flux 2!"}

data:{"message":"Hello Flux 3!"}

data:{"message":"Hello Flux 4!"}

data:{"message":"Hello Flux 5!"}

event:end

* Connection #0 to host localhost left intact

別ターミナルからログファイルを確認します。※ 必要な個所を抜粋しています。

$ cd ~/tmp/async-spring-webflux/log
$ cat app.log
cat app.log
2023-03-20 12:22:02.249 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
2023-03-20 12:22:02.256 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started Application in 1.035 seconds (JVM running for 1.224)
2023-03-20 12:22:17.814 [INFO ] [parallel-1] com.example.springwebflux.Application.lambda$getFluxWithSSE$4:48 - Sending message: {message=Hello Flux 1!}
2023-03-20 12:22:19.827 [INFO ] [parallel-2] com.example.springwebflux.Application.lambda$getFluxWithSSE$4:48 - Sending message: {message=Hello Flux 2!}
2023-03-20 12:22:21.831 [INFO ] [parallel-3] com.example.springwebflux.Application.lambda$getFluxWithSSE$4:48 - Sending message: {message=Hello Flux 3!}
2023-03-20 12:22:23.842 [INFO ] [parallel-4] com.example.springwebflux.Application.lambda$getFluxWithSSE$4:48 - Sending message: {message=Hello Flux 4!}
2023-03-20 12:22:25.854 [INFO ] [parallel-5] com.example.springwebflux.Application.lambda$getFluxWithSSE$4:48 - Sending message: {message=Hello Flux 5!}

parallel-n というスレッド名から Spring WebFlux は2秒ごとに並列処理のスレッドで Flux を処理しています。クライアント (curl) 側にはデータは一つづつリアルタイムで取得されます。

ここまでのまとめ

Spring WebFlux では、組み込みの Web サーバーとして Netty が使用されています。 Flux オブジェクトは非同期で処理されています。

まとめ

Ubuntu に構築したシンプルな Java 開発環境で、Server-Sent Events を実装する Spring WebFlux Web サービスを実行することができました。

Ubuntu を使うと Linux の知識も身に付きます。最初は難しく感じるかもしれませんが、徐々に進めていけば自信を持って書けるようになります。

どうでしたか? WSL Ubuntu で、Server-Sent Events を実装する Spring Boot Web アプリケーションを手軽に実行することができます。ぜひお試しください。今後も Java の開発環境などを紹介していきますので、ぜひお楽しみにしてください。

参考資料

W3C Server-Sent Events

HTML Living Standard 9.2 Server-sent events

MDN サーバー送信イベント

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0