0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【Road to Cassandra Day】Cassandra リアクティブ スタイル プログラミング~高度なトピック③クエリ失敗からの再試行

Last updated at Posted at 2023-05-19

はじめに

Cassandra Day Tokyo

今年、2023年6月1日に、Cassandra Dayが日本でも開催されます。
Cassandra Dayは、昨年、ベルリン、ロンドン、アムステルダム、ハノイ、ジャカルタ、ヒューストン、サンタクララ、シアトル、シンガポールでも開催されました。

今回の東京での開催に向けて、Apache Cassandraに関する記事を発表していきます。

image.png

Apache Cassandraについて

Apache Cassandraとは、一言でいうなら、オープンソースの分散データベース管理システムです。

他の分散データベース管理システム同様、複数の汎用サーバーを用いて、ひとつのデータベースを構築します(開発などの目的のため、一つのサーバーのみで構成することも可能です)。

ここでは、詳しい説明は割愛し、興味のある方へのご紹介の役割は、公式サイトやWikipediaに譲ります。

Cassandra リアクティブ スタイル プログラミング

本稿の内容は、以下のドキュメントを元にしています。

Cassandra リアクティブ スタイル プログラミングの紹介と基本的な使い方については、本稿執筆者による以下の記事を参照ください。

高度なトピック

クエリ失敗からの再試行

リアクティブ スタイルのプログラミングでは、クエリの実行に失敗するとonErrorシグナルがトリガーされ、サブスクリプションがすぐに終了され、後続のクエリがまったく実行されなくなる可能性があります。

この動作が望ましくない場合は、フェイルセーフ システムの動作を模倣することができます。これには通常、onErrorReturnまたはonErrorResumeなどの演算子が使用されます。

次の例では、実行が失敗するたびに、スタック トレースが標準エラーに出力されます。onErrorResumeオペレーターがあるため、エラーは無視され、フローの実行が再開されます。

Flux<Statement<?>> stmts = ...;
stmts.flatMap(
    statement ->
        Flux.from(session.executeReactive(statement))
            .doOnError(Throwable::printStackTrace)
            .onErrorResume(error -> Mono.empty()))
    .blockLast();

一目見て分かる通り、上記の実装はあまりに素朴すぎます。

次の例は、前の例を拡張したものです。失敗した実行ごとに、エラーがUnavailableExceptionの場合は最大 3 回の再試行が試行され、再試行後にクエリが成功しなかった場合は、メッセージがログに記録されます。最後に、すべてのエラーが収集され、失敗したクエリの総数がコンソールに出力されます。

Flux<Statement<?>> statements = ...;
long failed = statements.flatMap(
    stmt ->
        Flux.defer(() -> session.executeReactive(stmt))
            // retry at most 3 times on Unavailable
            .retry(3, UnavailableException.class::isInstance)
            // handle errors
            .doOnError(
                error -> {
                  System.err.println("Statement failed: " + stmt);
                  error.printStackTrace();
                })
            // Collect errors and discard all returned rows
            .ignoreElements()
            .cast(Long.class)
            .onErrorReturn(1L))
    .sum()
    .block();
System.out.println("Total failed queries: " + failed);

上記の例では、Flux.defer()が、session.executeReactive()への呼び出しをラップするために使用しています。
ドライバーは常に単一サブスクリプションのみのパブリッシャーを作成するため、これが必要です。

deferオペレーターが用いられない場合、このような単一サブスクリプションのみをサポートするパブリッシャーは、retryのようなオペレーターと互換性がありません。
それは、こうしたオペレーターが上流のパブリッシャーに複数回サブスクライブし、ドライバーが例外をスローする場合があるためです。
これが、それがまさにdeferオペレーターが設計された目的です。deferオペレーターへの各サブスクリプションは session.executeReactive()への個別の呼び出しをトリガーし、セッションがクエリを再実行・再試行するたびに新しいパブリッシャーを返します。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?