はじめに
CosmosDBはMicrosoft Azureが提供する分散型のNoSQL(PostgresSQLなどRelational DBも実はある) データベース。世界中にデータが分散しているので、可用性がとても高く、そしてとても早い(らしい)。CosmosDBはただの便利なデータベースではなく、Change Feedと呼ばれるデータの追加、編集、消去といったイベントをメッセージブローカのようにパブリッシュする機能がついている。Change Feedを利用することにより、容易にマイクロサービスなどといった独立したサービスを開発していくことができる。今回の記事では、Java/Spring Boot を用いて、CosmosDBにレコードを追加し、そのレコード追加に関するChange Feedを別のSpring Boot アプリケーションから受信してみたいと思う。
リソースグループの作成
CosmosDB リソースの作成




ただの実験程度として遊ぶだけなら、Apply Free Tier Discount は絶対Applyにチェックする。




CosmosDBのリソースの作成は成功したら、データベースとleaseコンテイナーを作成する。leaseコンテイナーはChange Feedを利用するために必要なテーブル又はコレクション。

lease コンテイナーは Parition keyとして、idを必ず指定しなければならない。



Keysセクションでデータベースに接続するための情報を確認できる。

データベースに接続するために必要な情報は、URIとPRIMARY KEYの2つ。
プロデューサー側のSprint Boot アプリケーション
プロデューサー側として機能する、ConsmosDBにデータを保存する簡単なAPIサーバを作る。
必要ライブラリー
com.azure.spring:spring-cloud-azure-starter-data-cosmos Spring Boot アプリケーションからCosmosDBに接続するために絶対に必要。
plugins {
id 'org.springframework.boot' version '2.7.4'
id 'io.spring.dependency-management' version '1.0.14.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
ext {
set('springCloudAzureVersion', "4.4.0")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'com.azure.spring:spring-cloud-azure-starter-data-cosmos' // <= CosmosDB接続用
implementation 'org.springframework.boot:spring-boot-starter-validation:2.7.3'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
dependencyManagement {
imports {
mavenBom "com.azure.spring:spring-cloud-azure-dependencies:${springCloudAzureVersion}"
}
}
tasks.named('test') {
useJUnitPlatform()
}
コントローラー
package com.example.changefeedproducer.controllers;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import javax.validation.Valid;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PatchMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.support.ServletUriComponentsBuilder;
import com.example.changefeedproducer.dtos.CreateUserDto;
import com.example.changefeedproducer.dtos.UpdateUserDto;
import com.example.changefeedproducer.dtos.UserDto;
import com.example.changefeedproducer.services.UserService;
import lombok.RequiredArgsConstructor;
@RestController
@RequestMapping(path = UserController.BASE_URL)
@RequiredArgsConstructor
public class UserController {
public static final String BASE_URL = "/api/v1/users";
private final UserService userService;
@GetMapping("")
public ResponseEntity<List<UserDto>> getUsers() {
return ResponseEntity.ok(new ArrayList<UserDto>(userService.getOnes()));
}
@GetMapping("/{id}")
public ResponseEntity<UserDto> getUser(@PathVariable String id) {
return ResponseEntity.ok(userService.getOne(id));
}
@PostMapping("")
public ResponseEntity<UserDto> createUser(@Valid @RequestBody CreateUserDto dto) {
UserDto newUser = userService.createOne(dto);
URI location = ServletUriComponentsBuilder.fromCurrentRequest().path("/{id}")
.buildAndExpand(newUser.getId()).toUri();
return ResponseEntity.created(location).body(newUser);
}
@DeleteMapping("/{id}")
public ResponseEntity<Object> deleteUser(@PathVariable String id) {
userService.deleteOne(id);
return ResponseEntity.noContent().build();
}
@PatchMapping("/{id}")
public ResponseEntity<Object> updateUser(@PathVariable String id, @Valid @RequestBody UpdateUserDto dto) {
userService.updateOne(dto, id);
return ResponseEntity.noContent().build();
}
}
サービス
package com.example.changefeedproducer.services;
import java.util.Collection;
import com.example.changefeedproducer.dtos.CreateUserDto;
import com.example.changefeedproducer.dtos.UpdateUserDto;
import com.example.changefeedproducer.dtos.UserDto;
public interface UserService {
Collection<? extends UserDto> getOnes();
UserDto getOne(String id);
void deleteOne(String id);
void updateOne(UpdateUserDto dto, String id);
UserDto createOne(CreateUserDto dto);
}
package com.example.changefeedproducer.services;
import java.util.Collection;
import java.util.UUID;
import org.springframework.stereotype.Service;
import com.example.changefeedproducer.dtos.CreateUserDto;
import com.example.changefeedproducer.dtos.UpdateUserDto;
import com.example.changefeedproducer.dtos.UserDto;
import com.example.changefeedproducer.models.User;
import com.example.changefeedproducer.repositories.UserRepository;
import lombok.RequiredArgsConstructor;
@Service
@RequiredArgsConstructor
public class UserServiceImpl implements UserService {
private final UserRepository userRepository;
@Override
public Collection<? extends UserDto> getOnes() {
return userRepository.findAll().toStream().map(userItem -> new UserDto(
userItem.getId(),
userItem.getFirstName(),
userItem.getLastName())).toList();
}
@Override
public UserDto getOne(String id) {
User existingUser = userRepository.findById(id).block();
return new UserDto(
existingUser.getId(),
existingUser.getFirstName(),
existingUser.getLastName());
}
@Override
public void deleteOne(String id) {
userRepository.deleteById(id).block();
}
@Override
public void updateOne(UpdateUserDto dto, String id) {
// TODO Auto-generated method stub
}
@Override
public UserDto createOne(CreateUserDto dto) {
User newUser = new User(
UUID.randomUUID().toString(),
dto.getFirstName(),
dto.getLastName());
final User saveUserMono = userRepository.save(newUser).block();
return new UserDto(
saveUserMono.getId(),
saveUserMono.getFirstName(),
saveUserMono.getLastName());
}
}
リポジトリー
reactiveプログラミングの良さは一ミリも使わないが,
ReactiveCosmosRepositoryをUserRepositoryに実装する。(ConsmosRepositoryもあるが、findAllがinterableを返しStream APIが使いにくいので、とりあえずReactiveの方を採用した。)
package com.example.changefeedproducer.repositories;
import com.azure.spring.data.cosmos.repository.ReactiveCosmosRepository;
import com.example.changefeedproducer.models.User;
import org.springframework.stereotype.Repository;
@Repository
public interface UserRepository extends ReactiveCosmosRepository<User, String> {
}
DTO(データトランスファオブジェクト)
package com.example.changefeedproducer.dtos;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserDto {
private String id;
private String firstName;
private String lastName;
}
package com.example.changefeedproducer.dtos;
import javax.validation.constraints.NotNull;
import org.hibernate.validator.constraints.Length;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CreateUserDto {
@NotNull
@Length(max = 50)
private String firstName;
@NotNull
@Length(max = 50)
private String lastName;
}
package com.example.changefeedproducer.dtos;
import org.hibernate.validator.constraints.Length;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UpdateUserDto {
@Length(max = 50)
private String firstName;
@Length(max = 50)
private String lastName;
}
データベースモデル
JPA並に、シンプルかつ簡単にデータベースモデルを定義できる。PartitionKeyもアノテーションでモデルから指定できるので、とても楽である。
package com.example.changefeedproducer.models;
import com.azure.spring.data.cosmos.core.mapping.Container;
import com.azure.spring.data.cosmos.core.mapping.PartitionKey;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
@Container(containerName = "users")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
@Id
private String id;
private String firstName;
@PartitionKey
private String lastName;
}
Mainファイル
package com.example.changefeedproducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ChangeFeedProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ChangeFeedProducerApplication.class, args);
}
}
application.yml
ConsmosDB に接続するための情報を追加する。endpointとkeyに値は azure portal => CosmosDBリソース => Keys からそれぞれ確認できる。

spring:
cloud:
azure:
cosmos:
endpoint: https://azosmosdbname.documents.azure.com:443/
key: od0eHTo8dyxaQO3MvTyFtpjYdFPuMJh6vjHPfYiddDt75Z9Pf60TGkWy1rOmZNopRHc1jgmgmsc727JgscyWsA==
database: try_cosmos
populate-query-metrics: true
レコードを追加してみる。
curl -i -X POST -H "Content-Type: application/json" -d '{"firstName": "Taro", "lastName": "Yamada"}' http://localhost:8080/api/v1/users
無事、idが付与されてレスポンスが返ってきたら成功。
HTTP/1.1 201
Location: http://localhost:8080/api/v1/users/75a82daf-3a74-45d3-b616-0be3df87d11c
Content-Type: application/json
Transfer-Encoding: chunked
Date: Fri, 21 Oct 2022 05:25:10 GMT
{"id":"75a82daf-3a74-45d3-b616-0be3df87d11c","firstName":"Taro","lastName":"Yamada"}
Azure portal の方からも新しく追加したusersのレコードを確認できる。

コンシューマ側のSprint Boot アプリケーション
package com.example.changefeedconsumer.config;
import java.time.Duration;
import java.util.List;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.ChangeFeedProcessorBuilder;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.implementation.guava25.collect.Lists;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.fasterxml.jackson.databind.JsonNode;
@Configuration
public class UsersChangeFeedProcessorConfig {
private String endpointUri = "https://azosmosdbname.documents.azure.com:443/";
private String accessKey = "od0eHTo8dyxaQO3MvTyFtpjYdFPuMJh6vjHPfYiddDt75Z9Pf60TGkWy1rOmZNopRHc1jgmgmsc727JgscyWsA==";
private String hostName = "host-1";
private String dbName = "try_cosmos";
private String feedContainerId = "users";
private String leaseContainerId = "lease";
private List<String> preferredRegions = Lists.newArrayList("Japan East");
@Bean
public ChangeFeedProcessor changeFeedProcessor() {
CosmosAsyncClient cosmosAsyncClient = new CosmosClientBuilder()
// CosmomsDB => Keys で確認できる URI
.endpoint(endpointUri)
// CosmomsDB => Keys で確認できる PRIMARY KEY
.key(accessKey)
.preferredRegions(preferredRegions)
.consistencyLevel(ConsistencyLevel.SESSION)
// trueじゃないと"leaseClient: content response on write setting must be enabled"と怒られる。
.contentResponseOnWriteEnabled(true)
.buildAsyncClient();
CosmosAsyncContainer feedContainer = cosmosAsyncClient.getDatabase(dbName).getContainer(feedContainerId);
CosmosAsyncContainer leaseContainer = cosmosAsyncClient.getDatabase(dbName).getContainer(leaseContainerId);
return new ChangeFeedProcessorBuilder()
.hostName(hostName)
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.handleChanges(
(List<JsonNode> docs) -> {
for (JsonNode document : docs) {
// Change Feed を処理
System.out.println("Processing Change feed...");
System.out.println(document.toPrettyString());
}
})
.buildChangeFeedProcessor();
}
}
package com.example.changefeedconsumer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.azure.cosmos.ChangeFeedProcessor;
import lombok.RequiredArgsConstructor;
@SpringBootApplication
@RequiredArgsConstructor
public class ChangeFeedConsumerApplication implements CommandLineRunner {
private final ChangeFeedProcessor changeFeedProcessor;
public static void main(String[] args) {
SpringApplication.run(ChangeFeedConsumerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
changeFeedProcessor.start().block();
}
}
ローカルで、同時に二つのSpring Bootアプリケーションを動かす場合にport番号を変えておく。
server:
port: 7070
プロデューサー側、コンシューマ側両方のSpring Bootアプリケーションが起動できたら、プロデューサー側にHttp Requestを送って、CosmosDBに新しいレコードを追加する。
curl -i -X POST -H "Content-Type: application/json" -d '{"firstName": "Taro", "lastName": "Yamada"}' http://localhost:8080/api/v1/users
HTTP/1.1 201
Location: http://localhost:8080/api/v1/users/8e52d8e9-e4b0-43b4-bf59-92ade5cefe0c
Content-Type: application/json
Transfer-Encoding: chunked
Date: Fri, 21 Oct 2022 05:25:10 GMT
{"id":"8e52d8e9-e4b0-43b4-bf59-92ade5cefe0c","firstName":"Taro","lastName":"Yamada"}
無事、CosmosDBに新しいレコードが追加されたら、コンシューマ側でChange Feedを受信して、コンソールに出力されていることが確認できる。
2022-10-21 14:42:12.128 INFO 94305 --- [oundedElastic-4] c.a.c.i.c.common.DefaultObserver : Start processing from thread 67
Processing Change feed...
{
"id" : "8e52d8e9-e4b0-43b4-bf59-92ade5cefe0c",
"firstName" : "First name",
"lastName" : "Last name",
"_rid" : "Pf5yALUpM3QEAAAAAAAAAA==",
"_self" : "dbs/Pf5yAA==/colls/Pf5yALUpM3Q=/docs/Pf5yALUpM3QEAAAAAAAAAA==/",
"_etag" : "\"18007e42-0000-2300-0000-635231340000\"",
"_attachments" : "attachments/",
"_ts" : 1666330932,
"_lsn" : 101
}
おわり
実際に、CosmosDBのChange Feedを受信することができた。しかし今回どうしても解決できなかった問題がある。Change Feedを処理したあとに、エラーが発生して、以下のようにとてつもなく長いエラーメッセージがコンソールに出力される。エラーの内容的に、azure側と通信した際になにか問題が起きているのか、正直さっぱりわからなく、諦めてしまった。何か解決策や心当たりがある人がいたら教えてほしい。
2022-10-21 15:21:10.398 ERROR 98962 --- [oundedElastic-2] c.a.c.i.d.RntbdTransportClient : Report this azure-cosmos-4.38.0.jar issue to ensure it is addressed:
[RntbdServiceEndpoint({"id":4,"closed":false,"concurrentRequests":1,"remoteAddress":"cdb-ms-prod-japaneast1-be12.documents.azure.com/<unresolved>:14007","channelPool":{"remoteAddress":"cdb-ms-prod-japaneast1-be12.documents.azure.com/<unresolved>:14007","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":false,"endpointCount":6,"endpointEvictionCount":0}})]
[com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient.lambda$invokeStoreAsync$2(RntbdTransportClient.java:266)]
[request completed with an unexpected class java.util.concurrent.CancellationException: \{"record":AsyncRntbdRequestRecord({"args":{"transportRequestId":4394,"activityId":"8e182aea-5108-11ed-afe1-e5112e7ea2bd","origin":"rntbd://cdb-ms-prod-japaneast1-be12.documents.azure.com:14007","replicaPath":"/apps/b6a386ec-720b-44fc-95aa-6a03715c61c3/services/d646d164-7e64-4e0f-8da6-f1dc741dfb20/partitions/c7606e13-1dc6-4460-9b10-3d2ec4df2055/replicas/133107531143007136p","timeCreated":"2022-10-21T06:21:10.389746Z","lifetime":"PT0.006526125S"},"requestLength":463,"responseLength":-1,"status":{"done":false,"cancelled":false,"completedExceptionally":false},"timeline":[{"eventName":"created","startTimeUTC":"2022-10-21T06:21:10.389746Z","durationInMilliSecs":0.014},{"eventName":"queued","startTimeUTC":"2022-10-21T06:21:10.389760Z","durationInMilliSecs":0.001},{"eventName":"channelAcquisitionStarted","startTimeUTC":"2022-10-21T06:21:10.389761Z","durationInMilliSecs":2.219},{"eventName":"pipelined","startTimeUTC":"2022-10-21T06:21:10.391980Z","durationInMilliSecs":0.663},{"eventName":"transitTime","startTimeUTC":"2022-10-21T06:21:10.392643Z","durationInMilliSecs":3.658},{"eventName":"decodeTime","startTimeUTC":null,"durationInMilliSecs":0.0},{"eventName":"received","startTimeUTC":null,"durationInMilliSecs":0.0},{"eventName":"completed","startTimeUTC":null,"durationInMilliSecs":0.0}]}),"error":{"cause":null,"stackTrace":[{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"cancel","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.CompletableFuture","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoCompletionStage.java","lineNumber":56,"className":"reactor.core.publisher.MonoCompletionStage$1","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxDoFinally.java","lineNumber":134,"className":"reactor.core.publisher.FluxDoFinally$DoFinallySubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxContextWrite.java","lineNumber":141,"className":"reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":359,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":1022,"className":"reactor.core.publisher.FluxFlatMap$FlatMapInner","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribeEntry","fileName":"FluxFlatMap.java","lineNumber":340,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribeEntry","fileName":"FluxFlatMap.java","lineNumber":219,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribe","fileName":"FluxFlatMap.java","lineNumber":1083,"className":"reactor.core.publisher.FlatMapTracker","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":360,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoCollectList.java","lineNumber":144,"className":"reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMap.java","lineNumber":169,"className":"reactor.core.publisher.FluxMap$MapSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxTakeUntil.java","lineNumber":146,"className":"reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoIgnoreElements.java","lineNumber":104,"className":"reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxConcatArray.java","lineNumber":286,"className":"reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":1022,"className":"reactor.core.publisher.FluxFlatMap$FlatMapInner","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribeEntry","fileName":"FluxFlatMap.java","lineNumber":340,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribeEntry","fileName":"FluxFlatMap.java","lineNumber":219,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribe","fileName":"FluxFlatMap.java","lineNumber":1083,"className":"reactor.core.publisher.FlatMapTracker","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":360,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"doOnCancel","fileName":"MonoSingle.java","lineNumber":108,"className":"reactor.core.publisher.MonoSingle$SingleSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2784,"className":"reactor.core.publisher.Operators$MonoInnerProducerBase","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":187,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxPeekFuseable.java","lineNumber":159,"className":"reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"doOnCancel","fileName":"MonoSingle.java","lineNumber":108,"className":"reactor.core.publisher.MonoSingle$SingleSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2784,"className":"reactor.core.publisher.Operators$MonoInnerProducerBase","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":187,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxRetryWhen.java","lineNumber":163,"className":"reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"SerializedSubscriber.java","lineNumber":157,"className":"reactor.core.publisher.SerializedSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoPeekTerminal.java","lineNumber":144,"className":"reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":187,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":272,"className":"reactor.core.publisher.MonoFlatMap$FlatMapInner","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":188,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMapFuseable.java","lineNumber":176,"className":"reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMapFuseable.java","lineNumber":176,"className":"reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":187,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxRepeatWhen.java","lineNumber":136,"className":"reactor.core.publisher.FluxRepeatWhen$RepeatWhenMainSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"SerializedSubscriber.java","lineNumber":157,"className":"reactor.core.publisher.SerializedSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoNext.java","lineNumber":114,"className":"reactor.core.publisher.MonoNext$NextSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"set","fileName":"Operators.java","lineNumber":1160,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMergeSequential.java","lineNumber":601,"className":"reactor.core.publisher.FluxMergeSequential$MergeSequentialInner","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancelAll","fileName":"FluxMergeSequential.java","lineNumber":285,"className":"reactor.core.publisher.FluxMergeSequential$MergeSequentialMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainAndCancel","fileName":"FluxMergeSequential.java","lineNumber":276,"className":"reactor.core.publisher.FluxMergeSequential$MergeSequentialMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMergeSequential.java","lineNumber":270,"className":"reactor.core.publisher.FluxMergeSequential$MergeSequentialMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMap.java","lineNumber":169,"className":"reactor.core.publisher.FluxMap$MapSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMapMany.java","lineNumber":131,"className":"reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxPublishOn.java","lineNumber":277,"className":"reactor.core.publisher.FluxPublishOn$PublishOnSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"onNext","fileName":"FluxLimitRequest.java","lineNumber":103,"className":"reactor.core.publisher.FluxLimitRequest$FluxLimitRequestSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"runAsync","fileName":"FluxPublishOn.java","lineNumber":440,"className":"reactor.core.publisher.FluxPublishOn$PublishOnSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"run","fileName":"FluxPublishOn.java","lineNumber":527,"className":"reactor.core.publisher.FluxPublishOn$PublishOnSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"call","fileName":"WorkerTask.java","lineNumber":84,"className":"reactor.core.scheduler.WorkerTask","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"call","fileName":"WorkerTask.java","lineNumber":37,"className":"reactor.core.scheduler.WorkerTask","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"run","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.FutureTask","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"run","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"runWorker","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.ThreadPoolExecutor","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"run","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.ThreadPoolExecutor$Worker","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"run","fileName":null,"lineNumber":-1,"className":"java.lang.Thread","nativeMethod":false}],"message":null,"suppressed":[],"localizedMessage":null}}: com.azure.core.exception.AzureException
at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdReporter.doReportIssue(RntbdReporter.java:66)
at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdReporter.reportIssue(RntbdReporter.java:43)
at com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient.lambda$invokeStoreAsync$2(RntbdTransportClient.java:266)
at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3776)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.MonoCompletionStage.lambda$subscribe$0(MonoCompletionStage.java:89)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(Unknown Source)
at reactor.core.publisher.MonoCompletionStage.subscribe(MonoCompletionStage.java:67)
at reactor.core.publisher.Mono.subscribe(Mono.java:4455)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4570)
at reactor.core.publisher.Mono.subscribe(Mono.java:4422)
at reactor.core.publisher.Mono.subscribe(Mono.java:4358)
at reactor.core.publisher.Mono.subscribe(Mono.java:4330)
at com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient.lambda$invokeStoreAsync$5(RntbdTransportClient.java:329)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:146)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.cancel(FluxDoFinally.java:135)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:359)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1022)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:340)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:219)
at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1083)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:360)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.cancel(MonoCollectList.java:144)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.cancel(FluxTakeUntil.java:146)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.cancel(MonoIgnoreElements.java:104)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.cancel(FluxConcatArray.java:286)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1022)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:340)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:219)
at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1083)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:360)
at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnCancel(MonoSingle.java:108)
at reactor.core.publisher.Operators$MonoInnerProducerBase.cancel(Operators.java:2784)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:187)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnCancel(MonoSingle.java:108)
at reactor.core.publisher.Operators$MonoInnerProducerBase.cancel(Operators.java:2784)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:187)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.cancel(FluxRetryWhen.java:163)
at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:187)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.cancel(MonoFlatMap.java:272)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:188)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:176)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:176)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:187)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
at reactor.core.publisher.FluxRepeatWhen$RepeatWhenMainSubscriber.cancel(FluxRepeatWhen.java:136)
at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
at reactor.core.publisher.MonoNext$NextSubscriber.cancel(MonoNext.java:114)
at reactor.core.publisher.Operators.set(Operators.java:1160)
at reactor.core.publisher.FluxMergeSequential$MergeSequentialInner.cancel(FluxMergeSequential.java:601)
at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.cancelAll(FluxMergeSequential.java:285)
at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drainAndCancel(FluxMergeSequential.java:276)
at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.cancel(FluxMergeSequential.java:270)
at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:131)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.cancel(FluxPublishOn.java:277)
at reactor.core.publisher.FluxLimitRequest$FluxLimitRequestSubscriber.onNext(FluxLimitRequest.java:103)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
]



