ストリーミング送信にしたい場合
DTOで持たずに、**Base64を読みながら1行ずつ送信(またはチャンク)**したい場合は、Fluxを使う
public Mono<String> postStreaming(Flux<String> base64Lines) {
return webClient.post()
.uri("/stream-endpoint")
.contentType(MediaType.TEXT_PLAIN)
.body(base64Lines, String.class)
.retrieve()
.bodyToMono(String.class);
}
方針 | 向いてるケース | メモリ効率 |
---|---|---|
bodyValue(String) | JSON文字列を直接渡したい | △(大容量で危険) |
bodyValue(DTO) | 通常のJSON通信 | △(Base64がでかいと危険) |
Flux + streaming | Base64などを逐次送信したい | ◎(大容量向き) |
Fluxで送信は、データを チャンク(部分ごと)で送信しつつ、内部的に NIO(ノンブロッキング I/O)で動く
リクエスト全体を一度にメモリに載せず
-
少しずつ送信しながら
-
他のスレッドにブロックされずに処理を続けられる
つまり:非同期 + ストリーミング + メモリ効率◎
例
ポイント:
- Flux は 1行ずつ読み出し → そのまま HTTP ボディとして送信
- WebClient はそれをチャンクにして非同期で送る
- backpressure(背圧)制御も効くので、大量データでも安定
Flux<String> base64Lines = Flux.using(
() -> Files.lines(Paths.get("huge-base64.txt")), // Stream<String>
Flux::fromStream,
Stream::close
);
webClient.post()
.uri("/upload")
.contentType(MediaType.TEXT_PLAIN)
.body(base64Lines, String.class)
.retrieve()
.bodyToMono(String.class)
.subscribe(response -> {
System.out.println("Response: " + response);
});
例2
WebClientで連携先に渡すとき、Fluxでストリーミング送信できる
たとえば Spring MVC のコントローラで InputStream で受ける
この方式のメリット
リクエスト → レスポンスまで完全にストリーミング(=重いデータでもメモリ消費最小)
WebClientがFluxをそのまま HTTP ボディに変換して送信
@PostMapping("/relay")
public Mono<String> relay(HttpServletRequest request) {
Flux<DataBuffer> body = DataBufferUtils.readInputStream(
request::getInputStream,
new DefaultDataBufferFactory(),
4096 // buffer size
);
return webClient.post()
.uri("https://other-service/upload")
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(BodyInserters.fromDataBuffers(body))
.retrieve()
.bodyToMono(String.class);
}
構成例
Controller → Service → HTTP送信クラス(WebClientラッパー)
↓
Flux
という流れで、Flux をそのままサービス間で渡して処理するのは
リアクティブプログラミング的にも、設計的にも 全く問題ありません!
なぜアリか?
-
Flux はリアクティブストリームなので、
-
まだデータが流れていない
-
つまり「実行されるのは subscribe 以降」
-
-
そのため、Flux自体は **軽量なレシピ(パイプライン)**として他のメソッドに自由に渡せる
-
BodyInserters.fromDataBuffers(body) はそのまま WebClient に渡せる
構成例のサンプル
Controller
@PostMapping("/relay")
public Mono<String> relay(HttpServletRequest request) {
Flux<DataBuffer> body = DataBufferUtils.readInputStream(
request::getInputStream,
new DefaultDataBufferFactory(),
4096
);
return relayService.relayToOtherService(body);
}
RelayService
@Service
public class RelayService {
private final StreamingApiClient streamingApiClient;
public RelayService(StreamingApiClient streamingApiClient) {
this.streamingApiClient = streamingApiClient;
}
public Mono<String> relayToOtherService(Flux<DataBuffer> body) {
return streamingApiClient.sendStreaming(body);
}
}
StreamingApiClient(WebClientラッパー)
@Component
public class StreamingApiClient {
private final WebClient webClient;
public StreamingApiClient(WebClient.Builder builder) {
this.webClient = builder.baseUrl("https://api.example.com").build();
}
public Mono<String> sendStreaming(Flux<DataBuffer> body) {
return webClient.post()
.uri("/upload")
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(BodyInserters.fromDataBuffers(body))
.retrieve()
.bodyToMono(String.class);
}
}
なお、連携先に送る前に文字コード変換やるなら以下のようになる
Flux → 一度文字列化する
public Mono<String> forceConvertUtf8(Flux<DataBuffer> body) {
return DataBufferUtils.join(body)
.flatMap(buffer -> {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
DataBufferUtils.release(buffer);
// 一度UTF-8で文字列にし、またUTF-8で送る
String text = new String(bytes, StandardCharsets.UTF_8);
return existingClient.send(text);
});
}