7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

CosmosDB の Change Feedを受信してみる。

Last updated at Posted at 2022-10-21

はじめに

CosmosDBはMicrosoft Azureが提供する分散型のNoSQL(PostgresSQLなどRelational DBも実はある) データベース。世界中にデータが分散しているので、可用性がとても高く、そしてとても早い(らしい)。CosmosDBはただの便利なデータベースではなく、Change Feedと呼ばれるデータの追加、編集、消去といったイベントをメッセージブローカのようにパブリッシュする機能がついている。Change Feedを利用することにより、容易にマイクロサービスなどといった独立したサービスを開発していくことができる。今回の記事では、Java/Spring Boot を用いて、CosmosDBにレコードを追加し、そのレコード追加に関するChange Feedを別のSpring Boot アプリケーションから受信してみたいと思う。

リソースグループの作成

Screen Shot 2022-10-20 at 13.07.34.png
Screen Shot 2022-10-20 at 13.26.14.png
Screen Shot 2022-10-20 at 13.10.08.png
Screen Shot 2022-10-20 at 13.10.40.png

CosmosDB リソースの作成

Screen Shot 2022-10-20 at 13.11.49.png
Screen Shot 2022-10-20 at 13.12.51.png
Screen Shot 2022-10-20 at 13.13.35.png
Screen Shot 2022-10-20 at 13.21.24.png
ただの実験程度として遊ぶだけなら、Apply Free Tier Discount は絶対Applyにチェックする。
Screen Shot 2022-10-20 at 13.21.56.png
Screen Shot 2022-10-20 at 13.27.32.png
Screen Shot 2022-10-20 at 13.38.03.png
Screen Shot 2022-10-20 at 13.38.35.png
CosmosDBのリソースの作成は成功したら、データベースとleaseコンテイナーを作成する。leaseコンテイナーはChange Feedを利用するために必要なテーブル又はコレクション。
Screen Shot 2022-10-20 at 13.42.04.png
lease コンテイナーは Parition keyとして、idを必ず指定しなければならない。
Screen Shot 2022-10-20 at 13.43.38.png
Screen Shot 2022-10-20 at 13.45.42.png
Screen Shot 2022-10-20 at 13.49.48.png
Keysセクションでデータベースに接続するための情報を確認できる。
Screen Shot 2022-10-20 at 13.53.02.png
データベースに接続するために必要な情報は、URIPRIMARY KEYの2つ。

プロデューサー側のSprint Boot アプリケーション 

プロデューサー側として機能する、ConsmosDBにデータを保存する簡単なAPIサーバを作る。

必要ライブラリー

com.azure.spring:spring-cloud-azure-starter-data-cosmos Spring Boot アプリケーションからCosmosDBに接続するために絶対に必要。

build.gradle
plugins {
	id 'org.springframework.boot' version '2.7.4'
	id 'io.spring.dependency-management' version '1.0.14.RELEASE'
	id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'

configurations {
	compileOnly {
		extendsFrom annotationProcessor
	}
}

repositories {
	mavenCentral()
}

ext {
	set('springCloudAzureVersion', "4.4.0")
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-web'
	implementation 'com.azure.spring:spring-cloud-azure-starter-data-cosmos' // <= CosmosDB接続用
	implementation 'org.springframework.boot:spring-boot-starter-validation:2.7.3'
	compileOnly 'org.projectlombok:lombok'
	developmentOnly 'org.springframework.boot:spring-boot-devtools'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

dependencyManagement {
	imports {
		mavenBom "com.azure.spring:spring-cloud-azure-dependencies:${springCloudAzureVersion}"
	}
}

tasks.named('test') {
	useJUnitPlatform()
}

コントローラー

UserController.java
package com.example.changefeedproducer.controllers;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import javax.validation.Valid;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PatchMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.support.ServletUriComponentsBuilder;

import com.example.changefeedproducer.dtos.CreateUserDto;
import com.example.changefeedproducer.dtos.UpdateUserDto;
import com.example.changefeedproducer.dtos.UserDto;
import com.example.changefeedproducer.services.UserService;

import lombok.RequiredArgsConstructor;

@RestController
@RequestMapping(path = UserController.BASE_URL)
@RequiredArgsConstructor
public class UserController {
    public static final String BASE_URL = "/api/v1/users";

    private final UserService userService;

    @GetMapping("")
    public ResponseEntity<List<UserDto>> getUsers() {
        return ResponseEntity.ok(new ArrayList<UserDto>(userService.getOnes()));
    }

    @GetMapping("/{id}")
    public ResponseEntity<UserDto> getUser(@PathVariable String id) {
        return ResponseEntity.ok(userService.getOne(id));
    }

    @PostMapping("")
    public ResponseEntity<UserDto> createUser(@Valid @RequestBody CreateUserDto dto) {
        UserDto newUser = userService.createOne(dto);
        URI location = ServletUriComponentsBuilder.fromCurrentRequest().path("/{id}")
                .buildAndExpand(newUser.getId()).toUri();
        return ResponseEntity.created(location).body(newUser);
    }

    @DeleteMapping("/{id}")
    public ResponseEntity<Object> deleteUser(@PathVariable String id) {
        userService.deleteOne(id);
        return ResponseEntity.noContent().build();
    }

    @PatchMapping("/{id}")
    public ResponseEntity<Object> updateUser(@PathVariable String id, @Valid @RequestBody UpdateUserDto dto) {
        userService.updateOne(dto, id);
        return ResponseEntity.noContent().build();
    }
}

サービス

UserService.java
package com.example.changefeedproducer.services;

import java.util.Collection;

import com.example.changefeedproducer.dtos.CreateUserDto;
import com.example.changefeedproducer.dtos.UpdateUserDto;
import com.example.changefeedproducer.dtos.UserDto;

public interface UserService {

    Collection<? extends UserDto> getOnes();

    UserDto getOne(String id);

    void deleteOne(String id);

    void updateOne(UpdateUserDto dto, String id);

    UserDto createOne(CreateUserDto dto);

}
UserServiceImpl.java
package com.example.changefeedproducer.services;

import java.util.Collection;
import java.util.UUID;

import org.springframework.stereotype.Service;

import com.example.changefeedproducer.dtos.CreateUserDto;
import com.example.changefeedproducer.dtos.UpdateUserDto;
import com.example.changefeedproducer.dtos.UserDto;
import com.example.changefeedproducer.models.User;
import com.example.changefeedproducer.repositories.UserRepository;

import lombok.RequiredArgsConstructor;

@Service
@RequiredArgsConstructor
public class UserServiceImpl implements UserService {
    private final UserRepository userRepository;

    @Override
    public Collection<? extends UserDto> getOnes() {
        return userRepository.findAll().toStream().map(userItem -> new UserDto(
                userItem.getId(),
                userItem.getFirstName(),
                userItem.getLastName())).toList();
    }

    @Override
    public UserDto getOne(String id) {
        User existingUser = userRepository.findById(id).block();
        return new UserDto(
                existingUser.getId(),
                existingUser.getFirstName(),
                existingUser.getLastName());
    }

    @Override
    public void deleteOne(String id) {
        userRepository.deleteById(id).block();
    }

    @Override
    public void updateOne(UpdateUserDto dto, String id) {
        // TODO Auto-generated method stub

    }

    @Override
    public UserDto createOne(CreateUserDto dto) {
        User newUser = new User(
                UUID.randomUUID().toString(),
                dto.getFirstName(),
                dto.getLastName());
        final User saveUserMono = userRepository.save(newUser).block();
        return new UserDto(
                saveUserMono.getId(),
                saveUserMono.getFirstName(),
                saveUserMono.getLastName());
    }
}

リポジトリー

reactiveプログラミングの良さは一ミリも使わないが,
ReactiveCosmosRepositoryUserRepositoryに実装する。(ConsmosRepositoryもあるが、findAllがinterableを返しStream APIが使いにくいので、とりあえずReactiveの方を採用した。)

UserRepository.java
package com.example.changefeedproducer.repositories;

import com.azure.spring.data.cosmos.repository.ReactiveCosmosRepository;
import com.example.changefeedproducer.models.User;

import org.springframework.stereotype.Repository;

@Repository
public interface UserRepository extends ReactiveCosmosRepository<User, String> {

}

DTO(データトランスファオブジェクト)

UserDto.java
package com.example.changefeedproducer.dtos;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserDto {
    private String id;
    private String firstName;
    private String lastName;
}
CreateUserDto.java
package com.example.changefeedproducer.dtos;

import javax.validation.constraints.NotNull;

import org.hibernate.validator.constraints.Length;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CreateUserDto {
    @NotNull
    @Length(max = 50)
    private String firstName;
    @NotNull
    @Length(max = 50)
    private String lastName;
}

UpdateUserDto.java
package com.example.changefeedproducer.dtos;

import org.hibernate.validator.constraints.Length;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class UpdateUserDto {
    @Length(max = 50)
    private String firstName;
    @Length(max = 50)
    private String lastName;
}

データベースモデル

JPA並に、シンプルかつ簡単にデータベースモデルを定義できる。PartitionKeyもアノテーションでモデルから指定できるので、とても楽である。

User.java
package com.example.changefeedproducer.models;

import com.azure.spring.data.cosmos.core.mapping.Container;
import com.azure.spring.data.cosmos.core.mapping.PartitionKey;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import org.springframework.data.annotation.Id;

@Container(containerName = "users")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
    @Id
    private String id;
    private String firstName;
    @PartitionKey
    private String lastName;

}

Mainファイル

ChangeFeedProducerApplication.java
package com.example.changefeedproducer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ChangeFeedProducerApplication {

	public static void main(String[] args) {
		SpringApplication.run(ChangeFeedProducerApplication.class, args);
	}

}

application.yml

ConsmosDB に接続するための情報を追加する。endpointkeyに値は azure portal => CosmosDBリソース => Keys からそれぞれ確認できる。
Screen Shot 2022-10-20 at 13.53.02.png

application.yml
spring:
  cloud:
    azure:
      cosmos:
        endpoint: https://azosmosdbname.documents.azure.com:443/
        key: od0eHTo8dyxaQO3MvTyFtpjYdFPuMJh6vjHPfYiddDt75Z9Pf60TGkWy1rOmZNopRHc1jgmgmsc727JgscyWsA==
        database: try_cosmos
        populate-query-metrics: true

レコードを追加してみる。

curl -i -X POST -H "Content-Type: application/json" -d '{"firstName": "Taro", "lastName": "Yamada"}' http://localhost:8080/api/v1/users

無事、idが付与されてレスポンスが返ってきたら成功。

HTTP/1.1 201 
Location: http://localhost:8080/api/v1/users/75a82daf-3a74-45d3-b616-0be3df87d11c
Content-Type: application/json
Transfer-Encoding: chunked
Date: Fri, 21 Oct 2022 05:25:10 GMT

{"id":"75a82daf-3a74-45d3-b616-0be3df87d11c","firstName":"Taro","lastName":"Yamada"}

Azure portal の方からも新しく追加したusersのレコードを確認できる。
Screen Shot 2022-10-21 at 14.26.16.png

コンシューマ側のSprint Boot アプリケーション

package com.example.changefeedconsumer.config;

import java.time.Duration;
import java.util.List;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.ChangeFeedProcessorBuilder;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.implementation.guava25.collect.Lists;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.fasterxml.jackson.databind.JsonNode;

@Configuration
public class UsersChangeFeedProcessorConfig {

    private String endpointUri = "https://azosmosdbname.documents.azure.com:443/";
    private String accessKey = "od0eHTo8dyxaQO3MvTyFtpjYdFPuMJh6vjHPfYiddDt75Z9Pf60TGkWy1rOmZNopRHc1jgmgmsc727JgscyWsA==";
    private String hostName = "host-1";
    private String dbName = "try_cosmos";
    private String feedContainerId = "users";
    private String leaseContainerId = "lease";
    private List<String> preferredRegions = Lists.newArrayList("Japan East");

    @Bean
    public ChangeFeedProcessor changeFeedProcessor() {
        CosmosAsyncClient cosmosAsyncClient = new CosmosClientBuilder()
                 // CosmomsDB => Keys で確認できる URI
                .endpoint(endpointUri)
                 // CosmomsDB => Keys で確認できる PRIMARY KEY
                .key(accessKey)
                .preferredRegions(preferredRegions)
                .consistencyLevel(ConsistencyLevel.SESSION)
                // trueじゃないと"leaseClient: content response on write setting must be enabled"と怒られる。
                .contentResponseOnWriteEnabled(true)
                .buildAsyncClient();

        CosmosAsyncContainer feedContainer = cosmosAsyncClient.getDatabase(dbName).getContainer(feedContainerId);
        CosmosAsyncContainer leaseContainer = cosmosAsyncClient.getDatabase(dbName).getContainer(leaseContainerId);

        return new ChangeFeedProcessorBuilder()
                .hostName(hostName)
                .feedContainer(feedContainer)
                .leaseContainer(leaseContainer)
                .handleChanges(
                        (List<JsonNode> docs) -> {
                            for (JsonNode document : docs) {
                                // Change Feed を処理
                                System.out.println("Processing Change feed...");
                                System.out.println(document.toPrettyString());
                            }
                        })
                .buildChangeFeedProcessor();
    }

}
package com.example.changefeedconsumer;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.azure.cosmos.ChangeFeedProcessor;

import lombok.RequiredArgsConstructor;

@SpringBootApplication
@RequiredArgsConstructor
public class ChangeFeedConsumerApplication implements CommandLineRunner {
	private final ChangeFeedProcessor changeFeedProcessor;

	public static void main(String[] args) {
		SpringApplication.run(ChangeFeedConsumerApplication.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		changeFeedProcessor.start().block();
	}

}

ローカルで、同時に二つのSpring Bootアプリケーションを動かす場合にport番号を変えておく。

server:
  port: 7070

プロデューサー側、コンシューマ側両方のSpring Bootアプリケーションが起動できたら、プロデューサー側にHttp Requestを送って、CosmosDBに新しいレコードを追加する。

Producer
curl -i -X POST -H "Content-Type: application/json" -d '{"firstName": "Taro", "lastName": "Yamada"}' http://localhost:8080/api/v1/users
Producer
HTTP/1.1 201 
Location: http://localhost:8080/api/v1/users/8e52d8e9-e4b0-43b4-bf59-92ade5cefe0c
Content-Type: application/json
Transfer-Encoding: chunked
Date: Fri, 21 Oct 2022 05:25:10 GMT

{"id":"8e52d8e9-e4b0-43b4-bf59-92ade5cefe0c","firstName":"Taro","lastName":"Yamada"}

無事、CosmosDBに新しいレコードが追加されたら、コンシューマ側でChange Feedを受信して、コンソールに出力されていることが確認できる。

Consumer
2022-10-21 14:42:12.128  INFO 94305 --- [oundedElastic-4] c.a.c.i.c.common.DefaultObserver : Start processing from thread 67
Processing Change feed...
{
  "id" : "8e52d8e9-e4b0-43b4-bf59-92ade5cefe0c",
  "firstName" : "First name",
  "lastName" : "Last name",
  "_rid" : "Pf5yALUpM3QEAAAAAAAAAA==",
  "_self" : "dbs/Pf5yAA==/colls/Pf5yALUpM3Q=/docs/Pf5yALUpM3QEAAAAAAAAAA==/",
  "_etag" : "\"18007e42-0000-2300-0000-635231340000\"",
  "_attachments" : "attachments/",
  "_ts" : 1666330932,
  "_lsn" : 101
}

おわり

実際に、CosmosDBのChange Feedを受信することができた。しかし今回どうしても解決できなかった問題がある。Change Feedを処理したあとに、エラーが発生して、以下のようにとてつもなく長いエラーメッセージがコンソールに出力される。エラーの内容的に、azure側と通信した際になにか問題が起きているのか、正直さっぱりわからなく、諦めてしまった。何か解決策や心当たりがある人がいたら教えてほしい。


2022-10-21 15:21:10.398 ERROR 98962 --- [oundedElastic-2] c.a.c.i.d.RntbdTransportClient           : Report this azure-cosmos-4.38.0.jar issue to ensure it is addressed:
[RntbdServiceEndpoint({"id":4,"closed":false,"concurrentRequests":1,"remoteAddress":"cdb-ms-prod-japaneast1-be12.documents.azure.com/<unresolved>:14007","channelPool":{"remoteAddress":"cdb-ms-prod-japaneast1-be12.documents.azure.com/<unresolved>:14007","isClosed":false,"configuration":{"maxChannels":130,"maxRequestsPerChannel":30,"idleConnectionTimeout":0,"readDelayLimit":65000000000,"writeDelayLimit":10000000000},"state":{"channelsAcquired":0,"channelsAvailable":1,"requestQueueLength":0}},"transportClient":{"id":1,"closed":false,"endpointCount":6,"endpointEvictionCount":0}})]
[com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient.lambda$invokeStoreAsync$2(RntbdTransportClient.java:266)]
[request completed with an unexpected class java.util.concurrent.CancellationException: \{"record":AsyncRntbdRequestRecord({"args":{"transportRequestId":4394,"activityId":"8e182aea-5108-11ed-afe1-e5112e7ea2bd","origin":"rntbd://cdb-ms-prod-japaneast1-be12.documents.azure.com:14007","replicaPath":"/apps/b6a386ec-720b-44fc-95aa-6a03715c61c3/services/d646d164-7e64-4e0f-8da6-f1dc741dfb20/partitions/c7606e13-1dc6-4460-9b10-3d2ec4df2055/replicas/133107531143007136p","timeCreated":"2022-10-21T06:21:10.389746Z","lifetime":"PT0.006526125S"},"requestLength":463,"responseLength":-1,"status":{"done":false,"cancelled":false,"completedExceptionally":false},"timeline":[{"eventName":"created","startTimeUTC":"2022-10-21T06:21:10.389746Z","durationInMilliSecs":0.014},{"eventName":"queued","startTimeUTC":"2022-10-21T06:21:10.389760Z","durationInMilliSecs":0.001},{"eventName":"channelAcquisitionStarted","startTimeUTC":"2022-10-21T06:21:10.389761Z","durationInMilliSecs":2.219},{"eventName":"pipelined","startTimeUTC":"2022-10-21T06:21:10.391980Z","durationInMilliSecs":0.663},{"eventName":"transitTime","startTimeUTC":"2022-10-21T06:21:10.392643Z","durationInMilliSecs":3.658},{"eventName":"decodeTime","startTimeUTC":null,"durationInMilliSecs":0.0},{"eventName":"received","startTimeUTC":null,"durationInMilliSecs":0.0},{"eventName":"completed","startTimeUTC":null,"durationInMilliSecs":0.0}]}),"error":{"cause":null,"stackTrace":[{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"cancel","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.CompletableFuture","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoCompletionStage.java","lineNumber":56,"className":"reactor.core.publisher.MonoCompletionStage$1","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxDoFinally.java","lineNumber":134,"className":"reactor.core.publisher.FluxDoFinally$DoFinallySubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxContextWrite.java","lineNumber":141,"className":"reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":359,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":1022,"className":"reactor.core.publisher.FluxFlatMap$FlatMapInner","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribeEntry","fileName":"FluxFlatMap.java","lineNumber":340,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribeEntry","fileName":"FluxFlatMap.java","lineNumber":219,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribe","fileName":"FluxFlatMap.java","lineNumber":1083,"className":"reactor.core.publisher.FlatMapTracker","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":360,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoCollectList.java","lineNumber":144,"className":"reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMap.java","lineNumber":169,"className":"reactor.core.publisher.FluxMap$MapSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxTakeUntil.java","lineNumber":146,"className":"reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoIgnoreElements.java","lineNumber":104,"className":"reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxConcatArray.java","lineNumber":286,"className":"reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":1022,"className":"reactor.core.publisher.FluxFlatMap$FlatMapInner","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribeEntry","fileName":"FluxFlatMap.java","lineNumber":340,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribeEntry","fileName":"FluxFlatMap.java","lineNumber":219,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"unsubscribe","fileName":"FluxFlatMap.java","lineNumber":1083,"className":"reactor.core.publisher.FlatMapTracker","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxFlatMap.java","lineNumber":360,"className":"reactor.core.publisher.FluxFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"doOnCancel","fileName":"MonoSingle.java","lineNumber":108,"className":"reactor.core.publisher.MonoSingle$SingleSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2784,"className":"reactor.core.publisher.Operators$MonoInnerProducerBase","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":187,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxPeekFuseable.java","lineNumber":159,"className":"reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"doOnCancel","fileName":"MonoSingle.java","lineNumber":108,"className":"reactor.core.publisher.MonoSingle$SingleSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2784,"className":"reactor.core.publisher.Operators$MonoInnerProducerBase","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":187,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxRetryWhen.java","lineNumber":163,"className":"reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"SerializedSubscriber.java","lineNumber":157,"className":"reactor.core.publisher.SerializedSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoPeekTerminal.java","lineNumber":144,"className":"reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":187,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":272,"className":"reactor.core.publisher.MonoFlatMap$FlatMapInner","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":188,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMapFuseable.java","lineNumber":176,"className":"reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMapFuseable.java","lineNumber":176,"className":"reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMap.java","lineNumber":187,"className":"reactor.core.publisher.MonoFlatMap$FlatMapMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainLoop","fileName":"Operators.java","lineNumber":2252,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drain","fileName":"Operators.java","lineNumber":2220,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"Operators.java","lineNumber":2032,"className":"reactor.core.publisher.Operators$MultiSubscriptionSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxRepeatWhen.java","lineNumber":136,"className":"reactor.core.publisher.FluxRepeatWhen$RepeatWhenMainSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"SerializedSubscriber.java","lineNumber":157,"className":"reactor.core.publisher.SerializedSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoNext.java","lineNumber":114,"className":"reactor.core.publisher.MonoNext$NextSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"set","fileName":"Operators.java","lineNumber":1160,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMergeSequential.java","lineNumber":601,"className":"reactor.core.publisher.FluxMergeSequential$MergeSequentialInner","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancelAll","fileName":"FluxMergeSequential.java","lineNumber":285,"className":"reactor.core.publisher.FluxMergeSequential$MergeSequentialMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"drainAndCancel","fileName":"FluxMergeSequential.java","lineNumber":276,"className":"reactor.core.publisher.FluxMergeSequential$MergeSequentialMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMergeSequential.java","lineNumber":270,"className":"reactor.core.publisher.FluxMergeSequential$MergeSequentialMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxMap.java","lineNumber":169,"className":"reactor.core.publisher.FluxMap$MapSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"terminate","fileName":"Operators.java","lineNumber":1240,"className":"reactor.core.publisher.Operators","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"MonoFlatMapMany.java","lineNumber":131,"className":"reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"cancel","fileName":"FluxPublishOn.java","lineNumber":277,"className":"reactor.core.publisher.FluxPublishOn$PublishOnSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"onNext","fileName":"FluxLimitRequest.java","lineNumber":103,"className":"reactor.core.publisher.FluxLimitRequest$FluxLimitRequestSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"runAsync","fileName":"FluxPublishOn.java","lineNumber":440,"className":"reactor.core.publisher.FluxPublishOn$PublishOnSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"run","fileName":"FluxPublishOn.java","lineNumber":527,"className":"reactor.core.publisher.FluxPublishOn$PublishOnSubscriber","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"call","fileName":"WorkerTask.java","lineNumber":84,"className":"reactor.core.scheduler.WorkerTask","nativeMethod":false},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"call","fileName":"WorkerTask.java","lineNumber":37,"className":"reactor.core.scheduler.WorkerTask","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"run","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.FutureTask","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"run","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"runWorker","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.ThreadPoolExecutor","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"run","fileName":null,"lineNumber":-1,"className":"java.util.concurrent.ThreadPoolExecutor$Worker","nativeMethod":false},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"17.0.4.1","methodName":"run","fileName":null,"lineNumber":-1,"className":"java.lang.Thread","nativeMethod":false}],"message":null,"suppressed":[],"localizedMessage":null}}: com.azure.core.exception.AzureException
        at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdReporter.doReportIssue(RntbdReporter.java:66)
        at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdReporter.reportIssue(RntbdReporter.java:43)
        at com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient.lambda$invokeStoreAsync$2(RntbdTransportClient.java:266)
        at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3776)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
        at reactor.core.publisher.MonoCompletionStage.lambda$subscribe$0(MonoCompletionStage.java:89)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.whenComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.whenComplete(Unknown Source)
        at reactor.core.publisher.MonoCompletionStage.subscribe(MonoCompletionStage.java:67)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4455)
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4570)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4422)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4358)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4330)
        at com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient.lambda$invokeStoreAsync$5(RntbdTransportClient.java:329)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:146)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.cancel(FluxDoFinally.java:135)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:359)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
        at reactor.core.publisher.Operators.terminate(Operators.java:1240)
        at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1022)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:340)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:219)
        at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1083)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:360)
        at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.cancel(MonoCollectList.java:144)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
        at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
        at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.cancel(FluxTakeUntil.java:146)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.cancel(MonoIgnoreElements.java:104)
        at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.cancel(FluxConcatArray.java:286)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
        at reactor.core.publisher.Operators.terminate(Operators.java:1240)
        at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1022)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:340)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:219)
        at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1083)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:360)
        at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnCancel(MonoSingle.java:108)
        at reactor.core.publisher.Operators$MonoInnerProducerBase.cancel(Operators.java:2784)
        at reactor.core.publisher.Operators.terminate(Operators.java:1240)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:187)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
        at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnCancel(MonoSingle.java:108)
        at reactor.core.publisher.Operators$MonoInnerProducerBase.cancel(Operators.java:2784)
        at reactor.core.publisher.Operators.terminate(Operators.java:1240)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:187)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
        at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.cancel(FluxRetryWhen.java:163)
        at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:144)
        at reactor.core.publisher.Operators.terminate(Operators.java:1240)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:187)
        at reactor.core.publisher.Operators.terminate(Operators.java:1240)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.cancel(MonoFlatMap.java:272)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:188)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:176)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:176)
        at reactor.core.publisher.Operators.terminate(Operators.java:1240)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:187)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
        at reactor.core.publisher.FluxRepeatWhen$RepeatWhenMainSubscriber.cancel(FluxRepeatWhen.java:136)
        at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
        at reactor.core.publisher.MonoNext$NextSubscriber.cancel(MonoNext.java:114)
        at reactor.core.publisher.Operators.set(Operators.java:1160)
        at reactor.core.publisher.FluxMergeSequential$MergeSequentialInner.cancel(FluxMergeSequential.java:601)
        at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.cancelAll(FluxMergeSequential.java:285)
        at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drainAndCancel(FluxMergeSequential.java:276)
        at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.cancel(FluxMergeSequential.java:270)
        at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
        at reactor.core.publisher.Operators.terminate(Operators.java:1240)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:131)
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.cancel(FluxPublishOn.java:277)
        at reactor.core.publisher.FluxLimitRequest$FluxLimitRequestSubscriber.onNext(FluxLimitRequest.java:103)
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
]
7
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?