0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[Spring Framework] WebClient Fluxで送信

Posted at

ストリーミング送信にしたい場合

DTOで持たずに、**Base64を読みながら1行ずつ送信(またはチャンク)**したい場合は、Fluxを使う

public Mono<String> postStreaming(Flux<String> base64Lines) {
    return webClient.post()
            .uri("/stream-endpoint")
            .contentType(MediaType.TEXT_PLAIN)
            .body(base64Lines, String.class)
            .retrieve()
            .bodyToMono(String.class);
}
方針 向いてるケース メモリ効率
bodyValue(String) JSON文字列を直接渡したい △(大容量で危険)
bodyValue(DTO) 通常のJSON通信 △(Base64がでかいと危険)
Flux + streaming Base64などを逐次送信したい ◎(大容量向き)

Fluxで送信は、データを チャンク(部分ごと)で送信しつつ、内部的に NIO(ノンブロッキング I/O)で動く

リクエスト全体を一度にメモリに載せず

  • 少しずつ送信しながら

  • 他のスレッドにブロックされずに処理を続けられる

つまり:非同期 + ストリーミング + メモリ効率◎

ポイント:

  • Flux は 1行ずつ読み出し → そのまま HTTP ボディとして送信
  • WebClient はそれをチャンクにして非同期で送る
  • backpressure(背圧)制御も効くので、大量データでも安定
Flux<String> base64Lines = Flux.using(
    () -> Files.lines(Paths.get("huge-base64.txt")), // Stream<String>
    Flux::fromStream,
    Stream::close
);

webClient.post()
    .uri("/upload")
    .contentType(MediaType.TEXT_PLAIN)
    .body(base64Lines, String.class)
    .retrieve()
    .bodyToMono(String.class)
    .subscribe(response -> {
        System.out.println("Response: " + response);
    });

例2

WebClientで連携先に渡すとき、Fluxでストリーミング送信できる

たとえば Spring MVC のコントローラで InputStream で受ける

この方式のメリット
リクエスト → レスポンスまで完全にストリーミング(=重いデータでもメモリ消費最小)

WebClientがFluxをそのまま HTTP ボディに変換して送信

@PostMapping("/relay")
public Mono<String> relay(HttpServletRequest request) {
    Flux<DataBuffer> body = DataBufferUtils.readInputStream(
        request::getInputStream,
        new DefaultDataBufferFactory(),
        4096 // buffer size
    );

    return webClient.post()
        .uri("https://other-service/upload")
        .contentType(MediaType.APPLICATION_OCTET_STREAM)
        .body(BodyInserters.fromDataBuffers(body))
        .retrieve()
        .bodyToMono(String.class);
}

構成例

Controller → Service → HTTP送信クラス(WebClientラッパー)

Flux

という流れで、Flux をそのままサービス間で渡して処理するのは
リアクティブプログラミング的にも、設計的にも 全く問題ありません!

なぜアリか?

  • Flux はリアクティブストリームなので、

    • まだデータが流れていない

    • つまり「実行されるのは subscribe 以降」

  • そのため、Flux自体は **軽量なレシピ(パイプライン)**として他のメソッドに自由に渡せる

  • BodyInserters.fromDataBuffers(body) はそのまま WebClient に渡せる

構成例のサンプル

Controller

@PostMapping("/relay")
public Mono<String> relay(HttpServletRequest request) {
   Flux<DataBuffer> body = DataBufferUtils.readInputStream(
       request::getInputStream,
       new DefaultDataBufferFactory(),
       4096
   );

   return relayService.relayToOtherService(body);
}

RelayService

@Service
public class RelayService {

    private final StreamingApiClient streamingApiClient;

    public RelayService(StreamingApiClient streamingApiClient) {
        this.streamingApiClient = streamingApiClient;
    }

    public Mono<String> relayToOtherService(Flux<DataBuffer> body) {
        return streamingApiClient.sendStreaming(body);
    }
}

StreamingApiClient(WebClientラッパー)

@Component
public class StreamingApiClient {

   private final WebClient webClient;

   public StreamingApiClient(WebClient.Builder builder) {
       this.webClient = builder.baseUrl("https://api.example.com").build();
   }

   public Mono<String> sendStreaming(Flux<DataBuffer> body) {
       return webClient.post()
               .uri("/upload")
               .contentType(MediaType.APPLICATION_OCTET_STREAM)
               .body(BodyInserters.fromDataBuffers(body))
               .retrieve()
               .bodyToMono(String.class);
   }
}

なお、連携先に送る前に文字コード変換やるなら以下のようになる

Flux → 一度文字列化する

public Mono<String> forceConvertUtf8(Flux<DataBuffer> body) {
    return DataBufferUtils.join(body)
        .flatMap(buffer -> {
            byte[] bytes = new byte[buffer.readableByteCount()];
            buffer.read(bytes);
            DataBufferUtils.release(buffer);

            // 一度UTF-8で文字列にし、またUTF-8で送る
            String text = new String(bytes, StandardCharsets.UTF_8);
            return existingClient.send(text);
        });
}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?