0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Couchbase ServerリアクティブAPI解説(JAVA)

Last updated at Posted at 2021-07-09

Couchbase ServerリアクティブAPI概要

Couchbase Server JAVA SDKのリアクティブAPIは、その基盤としてReactorライブラリを使用しています。

Project Reactor

Reactor 3 Reference Guide

そのため、Reactorライブラリのクラス、MonoFluxの利用方法を基本として押さえておく必要があります。ここでは、省略して、以下のように整理しておきます。

  • Mono<T> 1つ(またはゼロ)の結果が期待される場合に利用
  • Flux<T> 複数の結果が期待される場合に利用

リアクティブAPIへのアクセス

下記のように、ブロッキングAPIで利用される通常のクラスから、reactive()メソッドを介してリアクティブなAPIへアクセスすることのできる対応物を得ることができます。

    Cluster cluster = Cluster.connect("127.0.0.1", "Administrator", "password");
    ReactiveCluster reactiveCluster = cluster.reactive();

    Bucket bucket = cluster.bucket("travel-sample");
    ReactiveBucket reactiveBucket = bucket.reactive();

    Collection collection = bucket.defaultCollection();
    ReactiveCollection reactiveCollection = collection.reactive();

寄り道:低レベル非同期APIについて

実は、ブロッキングAPIとリアクティブAPIはどちらも、下位レベルの非同期API基盤上に構築されています(具体的な実装としてJDK1.8以降に含まれるCompletableFutureが利用されています)。

下記のように、このAPIにアクセスするためのアクセサメソッドasync()が提供されています。

    AsyncCluster asyncCluster = cluster.async();
    AsyncBucket asyncBucket = bucket.async();
    AsyncCollection asyncCollection = collection.async();

低レベル非同期APIを使うことで、リアクティブAPIでは実現できない同時実行制御や、性能チューニングを実行できる余地がありますが、特別なニーズがない限り、リアクティブAPIの表現力を生かして、より効率的かつ安定した開発を行うことができます。

コードを用いた解説

最も単純なバルクフェッチは次のようになります(エラー処理なし)。

List<String> docsToFetch = Arrays.asList("airline_10123", "airline_10226", "airline_10642");
List<GetResult> results = Flux.fromIterable(docsToFetch).flatMap(reactiveCollection::get).collectList().block();

このコードは、フェッチするキーのリストを作り、それをReactiveCollection#get(String)に渡しています。フェッチ処理は非同期で発生するため、結果はサーバークラスターから返される順序で返されます(つまり元のリストの順序とは異なる可能性があります)。block()メソッドにより、すべての結果が収集されるまで待機します。このブロッキング処理はオプションですが、リアクティブコードとブロッキングコードを組み合わせて、単純さのメリットを享受できることを示しています。

このコードは、単純ですが、1つの大きな欠点があります。各ドキュメントフェッチの際に発生しうるエラーは、ストリーム全体を失敗させる結果につながります。場合によっては、そのようなケースが意図されることもあるかもしれませんが、ほとんどの場合、個々の障害を無視するか、少なくとも失敗としてマークする必要があります。

個々のエラーを無視する方法は次のとおりです。

List<String> docsToFetch = Arrays.asList("airline_10748", "airline_10765", "airline_109");
List<GetResult> results = Flux.fromIterable(docsToFetch)
          .flatMap(key -> reactiveCollection.get(key).onErrorResume(e -> Mono.empty())).collectList().block();

成功と失敗を区別したい場合に取りうる1つの方法は「副作用」を許容することです。関数外の要素との関係することにより、純粋な関数型プログラミングの観点からは「副作用」があるコードとみなされますが、問題なく機能します。ただし、リアクティブ(非同期)関数からアクセスされるオブジェクトには、スレッドセーフなクラスを使用することを忘れてはなりません。

List<String> docsToFetch = Arrays.asList("airline_112", "airline_1191", "airline_1203");

List<GetResult> successfulResults = Collections.synchronizedList(new ArrayList<>());
Map<String, Throwable> erroredResults = new ConcurrentHashMap<>();

Flux.fromIterable(docsToFetch).flatMap(key -> reactiveCollection.get(key).onErrorResume(e -> {
        erroredResults.put(key, e);
        return Mono.empty();
      })).doOnNext(successfulResults::add).last().block();

処理が成功した場合は、副作用を許容するメソッドであるdoOnNextを使用して取得結果をsuccessfulResultsに格納しています。失敗した場合には、erroredResultsマップにドキュメントのキーと例外オブジェクトを格納していますが、シーケンス全体としては無視して処理を継続しています。

さらに、個々の障害発生に対して、再試行処理を組み込むことも可能です。

List<String> docsToFetch = Arrays.asList("airline_1316", "airline_13391", "airline_1355");

List<GetResult> results = Flux.fromIterable(docsToFetch)
          .flatMap(key -> reactiveCollection.get(key).retryWhen(Retry.backoff(10, Duration.ofMillis(10)))).collectList()
          .block();

Reactorのアドオンライブラリであるreactor-extraに含まれるreactor.retryを利用しています。

他のリアクティブライブラリとの組み合わせ

Couchbase Server SDKが用いているreactorライブラリは、非同期ストリーム処理の標準を提供するReactive Streamsをサポートしており、最小限のハードルで他のリアクティブライブラリと組み合わせることができます。

例えば、アプリケーションスタックがRxJava上に構築されている場合、まず、以下のように、ReactorAdapterライブラリをプロジェクトに含めます。

<dependency>
    <groupId>io.projectreactor.addons</groupId>
    <artifactId>reactor-adapter</artifactId>
    <version>3.2.3.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.1.0</version>
</dependency>

ReactorAdapterライブラリを使用して、rxjavareactorの間で相互に変換できます。次のコードはMono<GetResult>を、RxJavaのSingle<GetResult>に変換しています。

    Single<GetResult> rxSingleResult = monoToSingle(reactiveCollection.get("airline_10"));

最後に

ここで紹介したコードは、データベース(Couchbase Server)から取得した結果をすぐに、block()を介して、リスト化してしまっています。Couchbase ServerのSDKは、ブートストラップ時にクラスターのトポロジーを把握した上で、各ドキュメントのキーに基づいて、そのドキュメントを管理するノードに対して、直接アクセスします。そのため、データベース/クラスターとのアクセスにおいて、非同期処理により効率化の恩恵を被っているといえます。実践においては、取得したデータの利用方法に応じて、block()を呼ばずに、継続して処理を実装することが考えられます。

より実践的なリアクティブプログラミングについては、関連情報のブログ記事などをご参考ください。

関連情報

Async & Reactive APIs

Spring WebFluxのリアクティブプログラミング実装サンプル(基本編)

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?