はじめに
CosmosDBはMicrosoft Azureが提供する分散型のNoSQL(PostgresSQLなどRelational DBも実はある) データベース。世界中にデータが分散しているので、可用性がとても高く、そしてとても早い(らしい)。CosmosDBはただの便利なデータベースではなく、Change Feedと呼ばれるデータの追加、編集、消去といったイベントをメッセージブローカのようにパブリッシュする機能がついている。Change Feedを利用することにより、容易にマイクロサービスなどといった独立したサービスを開発していくことができる。今回の記事では、Java/Spring Boot を用いて、CosmosDBにレコードを追加し、そのレコード追加に関するChange Feedを別のSpring Boot アプリケーションから受信してみたいと思う。
リソースグループの作成
CosmosDB リソースの作成
ただの実験程度として遊ぶだけなら、Apply Free Tier Discount
は絶対Apply
にチェックする。
CosmosDBのリソースの作成は成功したら、データベースとlease
コンテイナーを作成する。lease
コンテイナーはChange Feedを利用するために必要なテーブル又はコレクション。
lease
コンテイナーは Parition key
として、id
を必ず指定しなければならない。
Keys
セクションでデータベースに接続するための情報を確認できる。
データベースに接続するために必要な情報は、URI
とPRIMARY KEY
の2つ。
プロデューサー側のSprint Boot アプリケーション
プロデューサー側として機能する、ConsmosDBにデータを保存する簡単なAPIサーバを作る。
必要ライブラリー
com.azure.spring:spring-cloud-azure-starter-data-cosmos
Spring Boot アプリケーションからCosmosDBに接続するために絶対に必要。
plugins {
id 'org.springframework.boot' version '2.7.4'
id 'io.spring.dependency-management' version '1.0.14.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
ext {
set('springCloudAzureVersion', "4.4.0")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'com.azure.spring:spring-cloud-azure-starter-data-cosmos' // <= CosmosDB接続用
implementation 'org.springframework.boot:spring-boot-starter-validation:2.7.3'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
dependencyManagement {
imports {
mavenBom "com.azure.spring:spring-cloud-azure-dependencies:${springCloudAzureVersion}"
}
}
tasks.named('test') {
useJUnitPlatform()
}
コントローラー
package com.example.changefeedproducer.controllers;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import javax.validation.Valid;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PatchMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.support.ServletUriComponentsBuilder;
import com.example.changefeedproducer.dtos.CreateUserDto;
import com.example.changefeedproducer.dtos.UpdateUserDto;
import com.example.changefeedproducer.dtos.UserDto;
import com.example.changefeedproducer.services.UserService;
import lombok.RequiredArgsConstructor;
@RestController
@RequestMapping(path = UserController.BASE_URL)
@RequiredArgsConstructor
public class UserController {
public static final String BASE_URL = "/api/v1/users";
private final UserService userService;
@GetMapping("")
public ResponseEntity<List<UserDto>> getUsers() {
return ResponseEntity.ok(new ArrayList<UserDto>(userService.getOnes()));
}
@GetMapping("/{id}")
public ResponseEntity<UserDto> getUser(@PathVariable String id) {
return ResponseEntity.ok(userService.getOne(id));
}
@PostMapping("")
public ResponseEntity<UserDto> createUser(@Valid @RequestBody CreateUserDto dto) {
UserDto newUser = userService.createOne(dto);
URI location = ServletUriComponentsBuilder.fromCurrentRequest().path("/{id}")
.buildAndExpand(newUser.getId()).toUri();
return ResponseEntity.created(location).body(newUser);
}
@DeleteMapping("/{id}")
public ResponseEntity<Object> deleteUser(@PathVariable String id) {
userService.deleteOne(id);
return ResponseEntity.noContent().build();
}
@PatchMapping("/{id}")
public ResponseEntity<Object> updateUser(@PathVariable String id, @Valid @RequestBody UpdateUserDto dto) {
userService.updateOne(dto, id);
return ResponseEntity.noContent().build();
}
}
サービス
package com.example.changefeedproducer.services;
import java.util.Collection;
import com.example.changefeedproducer.dtos.CreateUserDto;
import com.example.changefeedproducer.dtos.UpdateUserDto;
import com.example.changefeedproducer.dtos.UserDto;
public interface UserService {
Collection<? extends UserDto> getOnes();
UserDto getOne(String id);
void deleteOne(String id);
void updateOne(UpdateUserDto dto, String id);
UserDto createOne(CreateUserDto dto);
}
package com.example.changefeedproducer.services;
import java.util.Collection;
import java.util.UUID;
import org.springframework.stereotype.Service;
import com.example.changefeedproducer.dtos.CreateUserDto;
import com.example.changefeedproducer.dtos.UpdateUserDto;
import com.example.changefeedproducer.dtos.UserDto;
import com.example.changefeedproducer.models.User;
import com.example.changefeedproducer.repositories.UserRepository;
import lombok.RequiredArgsConstructor;
@Service
@RequiredArgsConstructor
public class UserServiceImpl implements UserService {
private final UserRepository userRepository;
@Override
public Collection<? extends UserDto> getOnes() {
return userRepository.findAll().toStream().map(userItem -> new UserDto(
userItem.getId(),
userItem.getFirstName(),
userItem.getLastName())).toList();
}
@Override
public UserDto getOne(String id) {
User existingUser = userRepository.findById(id).block();
return new UserDto(
existingUser.getId(),
existingUser.getFirstName(),
existingUser.getLastName());
}
@Override
public void deleteOne(String id) {
userRepository.deleteById(id).block();
}
@Override
public void updateOne(UpdateUserDto dto, String id) {
// TODO Auto-generated method stub
}
@Override
public UserDto createOne(CreateUserDto dto) {
User newUser = new User(
UUID.randomUUID().toString(),
dto.getFirstName(),
dto.getLastName());
final User saveUserMono = userRepository.save(newUser).block();
return new UserDto(
saveUserMono.getId(),
saveUserMono.getFirstName(),
saveUserMono.getLastName());
}
}
リポジトリー
reactiveプログラミングの良さは一ミリも使わないが,
ReactiveCosmosRepository
をUserRepository
に実装する。(ConsmosRepository
もあるが、findAllがinterable
を返しStream APIが使いにくいので、とりあえずReactive
の方を採用した。)
package com.example.changefeedproducer.repositories;
import com.azure.spring.data.cosmos.repository.ReactiveCosmosRepository;
import com.example.changefeedproducer.models.User;
import org.springframework.stereotype.Repository;
@Repository
public interface UserRepository extends ReactiveCosmosRepository<User, String> {
}
DTO(データトランスファオブジェクト)
package com.example.changefeedproducer.dtos;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserDto {
private String id;
private String firstName;
private String lastName;
}
package com.example.changefeedproducer.dtos;
import javax.validation.constraints.NotNull;
import org.hibernate.validator.constraints.Length;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CreateUserDto {
@NotNull
@Length(max = 50)
private String firstName;
@NotNull
@Length(max = 50)
private String lastName;
}
package com.example.changefeedproducer.dtos;
import org.hibernate.validator.constraints.Length;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UpdateUserDto {
@Length(max = 50)
private String firstName;
@Length(max = 50)
private String lastName;
}
データベースモデル
JPA並に、シンプルかつ簡単にデータベースモデルを定義できる。PartitionKey
もアノテーションでモデルから指定できるので、とても楽である。
package com.example.changefeedproducer.models;
import com.azure.spring.data.cosmos.core.mapping.Container;
import com.azure.spring.data.cosmos.core.mapping.PartitionKey;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
@Container(containerName = "users")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
@Id
private String id;
private String firstName;
@PartitionKey
private String lastName;
}
Mainファイル
package com.example.changefeedproducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ChangeFeedProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ChangeFeedProducerApplication.class, args);
}
}
application.yml
ConsmosDB に接続するための情報を追加する。endpoint
とkey
に値は azure portal => CosmosDBリソース => Keys からそれぞれ確認できる。
spring:
cloud:
azure:
cosmos:
endpoint: https://azosmosdbname.documents.azure.com:443/
key: od0eHTo8dyxaQO3MvTyFtpjYdFPuMJh6vjHPfYiddDt75Z9Pf60TGkWy1rOmZNopRHc1jgmgmsc727JgscyWsA==
database: try_cosmos
populate-query-metrics: true
レコードを追加してみる。
curl -i -X POST -H "Content-Type: application/json" -d '{"firstName": "Taro", "lastName": "Yamada"}' http://localhost:8080/api/v1/users
無事、id
が付与されてレスポンスが返ってきたら成功。
HTTP/1.1 201
Location: http://localhost:8080/api/v1/users/75a82daf-3a74-45d3-b616-0be3df87d11c
Content-Type: application/json
Transfer-Encoding: chunked
Date: Fri, 21 Oct 2022 05:25:10 GMT
{"id":"75a82daf-3a74-45d3-b616-0be3df87d11c","firstName":"Taro","lastName":"Yamada"}
Azure portal の方からも新しく追加したusers
のレコードを確認できる。
コンシューマ側のSprint Boot アプリケーション
package com.example.changefeedconsumer.config;
import java.time.Duration;
import java.util.List;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.ChangeFeedProcessorBuilder;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.implementation.guava25.collect.Lists;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.fasterxml.jackson.databind.JsonNode;
@Configuration
public class UsersChangeFeedProcessorConfig {
private String endpointUri = "https://azosmosdbname.documents.azure.com:443/";
private String accessKey = "od0eHTo8dyxaQO3MvTyFtpjYdFPuMJh6vjHPfYiddDt75Z9Pf60TGkWy1rOmZNopRHc1jgmgmsc727JgscyWsA==";
private String hostName = "host-1";
private String dbName = "try_cosmos";
private String feedContainerId = "users";
private String leaseContainerId = "lease";
private List<String> preferredRegions = Lists.newArrayList("Japan East");
@Bean
public ChangeFeedProcessor changeFeedProcessor() {
CosmosAsyncClient cosmosAsyncClient = new CosmosClientBuilder()
// CosmomsDB => Keys で確認できる URI
.endpoint(endpointUri)
// CosmomsDB => Keys で確認できる PRIMARY KEY
.key(accessKey)
.preferredRegions(preferredRegions)
.consistencyLevel(ConsistencyLevel.SESSION)
// trueじゃないと"leaseClient: content response on write setting must be enabled"と怒られる。
.contentResponseOnWriteEnabled(true)
.buildAsyncClient();
CosmosAsyncContainer feedContainer = cosmosAsyncClient.getDatabase(dbName).getContainer(feedContainerId);
CosmosAsyncContainer leaseContainer = cosmosAsyncClient.getDatabase(dbName).getContainer(leaseContainerId);
return new ChangeFeedProcessorBuilder()
.hostName(hostName)
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.handleChanges(
(List<JsonNode> docs) -> {
for (JsonNode document : docs) {
// Change Feed を処理
System.out.println("Processing Change feed...");
System.out.println(document.toPrettyString());
}
})
.buildChangeFeedProcessor();
}
}
package com.example.changefeedconsumer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.azure.cosmos.ChangeFeedProcessor;
import lombok.RequiredArgsConstructor;
@SpringBootApplication
@RequiredArgsConstructor
public class ChangeFeedConsumerApplication implements CommandLineRunner {
private final ChangeFeedProcessor changeFeedProcessor;
public static void main(String[] args) {
SpringApplication.run(ChangeFeedConsumerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
changeFeedProcessor.start().block();
}
}
ローカルで、同時に二つのSpring Bootアプリケーションを動かす場合にport番号を変えておく。
server:
port: 7070
プロデューサー側、コンシューマ側両方のSpring Bootアプリケーションが起動できたら、プロデューサー側にHttp Requestを送って、CosmosDBに新しいレコードを追加する。
curl -i -X POST -H "Content-Type: application/json" -d '{"firstName": "Taro", "lastName": "Yamada"}' http://localhost:8080/api/v1/users
HTTP/1.1 201
Location: http://localhost:8080/api/v1/users/8e52d8e9-e4b0-43b4-bf59-92ade5cefe0c
Content-Type: application/json
Transfer-Encoding: chunked
Date: Fri, 21 Oct 2022 05:25:10 GMT
{"id":"8e52d8e9-e4b0-43b4-bf59-92ade5cefe0c","firstName":"Taro","lastName":"Yamada"}
無事、CosmosDBに新しいレコードが追加されたら、コンシューマ側でChange Feedを受信して、コンソールに出力されていることが確認できる。
2022-10-21 14:42:12.128 INFO 94305 --- [oundedElastic-4] c.a.c.i.c.common.DefaultObserver : Start processing from thread 67
Processing Change feed...
{
"id" : "8e52d8e9-e4b0-43b4-bf59-92ade5cefe0c",
"firstName" : "First name",
"lastName" : "Last name",
"_rid" : "Pf5yALUpM3QEAAAAAAAAAA==",
"_self" : "dbs/Pf5yAA==/colls/Pf5yALUpM3Q=/docs/Pf5yALUpM3QEAAAAAAAAAA==/",
"_etag" : "\"18007e42-0000-2300-0000-635231340000\"",
"_attachments" : "attachments/",
"_ts" : 1666330932,
"_lsn" : 101
}
おわり
実際に、CosmosDBのChange Feedを受信することができた。しかし今回どうしても解決できなかった問題がある。Change Feedを処理したあとに、エラーが発生して、以下のようにとてつもなく長いエラーメッセージがコンソールに出力される。エラーの内容的に、azure側と通信した際になにか問題が起きているのか、正直さっぱりわからなく、諦めてしまった。何か解決策や心当たりがある人がいたら教えてほしい。
2022-10-21 15:21:10.398 ERROR 98962 --- [oundedElastic-2] c.a.c.i.d.RntbdTransportClient : Report this azure-cosmos-4.38.0.jar issue to ensure it is addressed:
[RntbdServiceEndpoint({"id":4,"closed":false,"concurrentRequests":1,"remoteAddress":"cdb-ms-prod-japaneast1-be12.documents.azure.com/<unresolved>:14007","channelPool":{"remoteAddress":"cdb-ms-prod-japaneast1-be12.documents.azure.com/<unresolved>:14007","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":false,"endpointCount":6,"endpointEvictionCount":0}})]
[com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient.lambda$invokeStoreAsync$2(RntbdTransportClient.java:266)]
[request completed with an unexpected class java.util.concurrent.CancellationException: \{"record":AsyncRntbdRequestRecord({"args":{"transportRequestId":4394,"activityId":"8e182aea-5108-11ed-afe1-e5112e7ea2bd","origin":"rntbd://cdb-ms-prod-japaneast1-be12.documents.azure.com:14007","replicaPath":"/apps/b6a386ec-720b-44fc-95aa-6a03715c61c3/services/d646d164-7e64-4e0f-8da6-f1dc741dfb20/partitions/c7606e13-1dc6-4460-9b10-3d2ec4df2055/replicas/133107531143007136p","timeCreated":"2022-10-21T06:21:10.389746Z","lifetime":"PT0.006526125S"},"requestLength":463,"responseLength":-1,"status":{"done":false,"cancelled":false,"completedExceptionally":false},"timeline":[{"eventName":"created","startTimeUTC":"2022-10-21T06:21:10.389746Z","durationInMilliSecs":0.014},{"eventName":"queued","startTimeUTC":"2022-10-21T06:21:10.389760Z","durationInMilliSecs":0.001},{"eventName":"channelAcquisitionStarted","startTimeUTC":"2022-10-21T06:21:10.389761Z","durationInMilliSecs":2.219},{"eventName":"pipelined","startTimeUTC":"2022-10-21T06:21:10.391980Z","durationInMilliSecs":0.663},{"eventName":"transitTime","startTimeUTC":"2022-10-21T06:21:10.392643Z","durationInMilliSecs":3.658},{"eventName":"decodeTime","startTimeUTC":null,"durationInMilliSecs":0.0},{"eventName":"received","startTimeUTC":null,"durationInMilliSecs":0.0},{"eventName":"completed","startTimeUTC":null,"durationInMilliSecs":0.0}]}),"error":{"cause":null,"stackTrace":[{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"cancel","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.CompletableFuture","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoCompletionStage.java","lineNumber":56,"className":"reactor.core.publisher.MonoCompletionStage$1","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxDoFinally.java","lineNumber":134,"className":"reactor.core.publisher.FluxDoFinally$DoFinallySubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxContextWrite.java","lineNumber":141,"className":"reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":359,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":1022,"className":"reactor.core.publisher.FluxFlatMap$FlatMapInner","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribeEntry","fileName":"FluxFlatMap.java","lineNumber":340,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribeEntry","fileName":"FluxFlatMap.java","lineNumber":219,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribe","fileName":"FluxFlatMap.java","lineNumber":1083,"className":"reactor.core.publisher.FlatMapTracker","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":360,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoCollectList.java","lineNumber":144,"className":"reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMap.java","lineNumber":169,"className":"reactor.core.publisher.FluxMap$MapSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxTakeUntil.java","lineNumber":146,"className":"reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoIgnoreElements.java","lineNumber":104,"className":"reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxConcatArray.java","lineNumber":286,"className":"reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":1022,"className":"reactor.core.publisher.FluxFlatMap$FlatMapInner","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribeEntry","fileName":"FluxFlatMap.java","lineNumber":340,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribeEntry","fileName":"FluxFlatMap.java","lineNumber":219,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribe","fileName":"FluxFlatMap.java","lineNumber":1083,"className":"reactor.core.publisher.FlatMapTracker","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":360,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"doOnCancel","fileName":"MonoSingle.java","lineNumber":108,"className":"reactor.core.publisher.MonoSingle$SingleSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2784,"className":"reactor.core.publisher.Operators$MonoInnerProducerBase","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":187,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxPeekFuseable.java","lineNumber":159,"className":"reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"doOnCancel","fileName":"MonoSingle.java","lineNumber":108,"className":"reactor.core.publisher.MonoSingle$SingleSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2784,"className":"reactor.core.publisher.Operators$MonoInnerProducerBase","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":187,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxRetryWhen.java","lineNumber":163,"className":"reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"SerializedSubscriber.java","lineNumber":157,"className":"reactor.core.publisher.SerializedSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoPeekTerminal.java","lineNumber":144,"className":"reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":187,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":272,"className":"reactor.core.publisher.MonoFlatMap$FlatMapInner","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":188,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMapFuseable.java","lineNumber":176,"className":"reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMapFuseable.java","lineNumber":176,"className":"reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":187,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxRepeatWhen.java","lineNumber":136,"className":"reactor.core.publisher.FluxRepeatWhen$RepeatWhenMainSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"SerializedSubscriber.java","lineNumber":157,"className":"reactor.core.publisher.SerializedSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoNext.java","lineNumber":114,"className":"reactor.core.publisher.MonoNext$NextSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"set","fileName":"Operators.java","lineNumber":1160,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMergeSequential.java","lineNumber":601,"className":"reactor.core.publisher.FluxMergeSequential$MergeSequentialInner","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancelAll","fileName":"FluxMergeSequential.java","lineNumber":285,"className":"reactor.core.publisher.FluxMergeSequential$MergeSequentialMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainAndCancel","fileName":"FluxMergeSequential.java","lineNumber":276,"className":"reactor.core.publisher.FluxMergeSequential$MergeSequentialMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMergeSequential.java","lineNumber":270,"className":"reactor.core.publisher.FluxMergeSequential$MergeSequentialMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMap.java","lineNumber":169,"className":"reactor.core.publisher.FluxMap$MapSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMapMany.java","lineNumber":131,"className":"reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxPublishOn.java","lineNumber":277,"className":"reactor.core.publisher.FluxPublishOn$PublishOnSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"onNext","fileName":"FluxLimitRequest.java","lineNumber":103,"className":"reactor.core.publisher.FluxLimitRequest$FluxLimitRequestSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"runAsync","fileName":"FluxPublishOn.java","lineNumber":440,"className":"reactor.core.publisher.FluxPublishOn$PublishOnSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"run","fileName":"FluxPublishOn.java","lineNumber":527,"className":"reactor.core.publisher.FluxPublishOn$PublishOnSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"call","fileName":"WorkerTask.java","lineNumber":84,"className":"reactor.core.scheduler.WorkerTask","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"call","fileName":"WorkerTask.java","lineNumber":37,"className":"reactor.core.scheduler.WorkerTask","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"run","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.FutureTask","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"run","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"runWorker","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.ThreadPoolExecutor","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"run","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.ThreadPoolExecutor$Worker","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"run","fileName":null,"lineNumber":-1,"className":"java.lang.Thread","nativeMethod":false}],"message":null,"suppressed":[],"localizedMessage":null}}: com.azure.core.exception.AzureException
at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdReporter.doReportIssue(RntbdReporter.java:66)
at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdReporter.reportIssue(RntbdReporter.java:43)
at com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient.lambda$invokeStoreAsync$2(RntbdTransportClient.java:266)
at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3776)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.MonoCompletionStage.lambda$subscribe$0(MonoCompletionStage.java:89)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(Unknown Source)
at reactor.core.publisher.MonoCompletionStage.subscribe(MonoCompletionStage.java:67)
at reactor.core.publisher.Mono.subscribe(Mono.java:4455)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4570)
at reactor.core.publisher.Mono.subscribe(Mono.java:4422)
at reactor.core.publisher.Mono.subscribe(Mono.java:4358)
at reactor.core.publisher.Mono.subscribe(Mono.java:4330)
at com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient.lambda$invokeStoreAsync$5(RntbdTransportClient.java:329)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:146)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.cancel(FluxDoFinally.java:135)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:359)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1022)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:340)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:219)
at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1083)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:360)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.cancel(MonoCollectList.java:144)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.cancel(FluxTakeUntil.java:146)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.cancel(MonoIgnoreElements.java:104)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.cancel(FluxConcatArray.java:286)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1022)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:340)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:219)
at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1083)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:360)
at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnCancel(MonoSingle.java:108)
at reactor.core.publisher.Operators$MonoInnerProducerBase.cancel(Operators.java:2784)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:187)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnCancel(MonoSingle.java:108)
at reactor.core.publisher.Operators$MonoInnerProducerBase.cancel(Operators.java:2784)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:187)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.cancel(FluxRetryWhen.java:163)
at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:187)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.cancel(MonoFlatMap.java:272)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:188)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:176)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:176)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:187)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.FluxRepeatWhen$RepeatWhenMainSubscriber.cancel(FluxRepeatWhen.java:136)
at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
at reactor.core.publisher.MonoNext$NextSubscriber.cancel(MonoNext.java:114)
at reactor.core.publisher.Operators.set(Operators.java:1160)
at reactor.core.publisher.FluxMergeSequential$MergeSequentialInner.cancel(FluxMergeSequential.java:601)
at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.cancelAll(FluxMergeSequential.java:285)
at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drainAndCancel(FluxMergeSequential.java:276)
at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.cancel(FluxMergeSequential.java:270)
at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:131)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.cancel(FluxPublishOn.java:277)
at reactor.core.publisher.FluxLimitRequest$FluxLimitRequestSubscriber.onNext(FluxLimitRequest.java:103)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
]