はじめに
Cassandra Day Tokyo
今年、2023年6月1日に、Cassandra Dayが日本でも開催されます。
Cassandra Dayは、昨年、ベルリン、ロンドン、アムステルダム、ハノイ、ジャカルタ、ヒューストン、サンタクララ、シアトル、シンガポールでも開催されました。
今回の東京での開催に向けて、Apache Cassandraに関する記事を発表していきます。
Apache Cassandraについて
Apache Cassandraとは、一言でいうなら、オープンソースの分散データベース管理システムです。
他の分散データベース管理システム同様、複数の汎用サーバーを用いて、ひとつのデータベースを構築します(開発などの目的のため、一つのサーバーのみで構成することも可能です)。
ここでは、詳しい説明は割愛し、興味のある方へのご紹介の役割は、公式サイトやWikipediaに譲ります。
Cassandra リアクティブ スタイル プログラミング
本稿の内容は、以下のドキュメントを元にしています。
Cassandra リアクティブ スタイル プログラミングの紹介と基本的な使い方については、本稿執筆者による以下の記事を参照ください。
高度なトピック
クエリ失敗からの再試行
リアクティブ スタイルのプログラミングでは、クエリの実行に失敗するとonError
シグナルがトリガーされ、サブスクリプションがすぐに終了され、後続のクエリがまったく実行されなくなる可能性があります。
この動作が望ましくない場合は、フェイルセーフ システムの動作を模倣することができます。これには通常、onErrorReturn
またはonErrorResume
などの演算子が使用されます。
次の例では、実行が失敗するたびに、スタック トレースが標準エラーに出力されます。onErrorResume
オペレーターがあるため、エラーは無視され、フローの実行が再開されます。
Flux<Statement<?>> stmts = ...;
stmts.flatMap(
statement ->
Flux.from(session.executeReactive(statement))
.doOnError(Throwable::printStackTrace)
.onErrorResume(error -> Mono.empty()))
.blockLast();
一目見て分かる通り、上記の実装はあまりに素朴すぎます。
次の例は、前の例を拡張したものです。失敗した実行ごとに、エラーがUnavailableException
の場合は最大 3 回の再試行が試行され、再試行後にクエリが成功しなかった場合は、メッセージがログに記録されます。最後に、すべてのエラーが収集され、失敗したクエリの総数がコンソールに出力されます。
Flux<Statement<?>> statements = ...;
long failed = statements.flatMap(
stmt ->
Flux.defer(() -> session.executeReactive(stmt))
// retry at most 3 times on Unavailable
.retry(3, UnavailableException.class::isInstance)
// handle errors
.doOnError(
error -> {
System.err.println("Statement failed: " + stmt);
error.printStackTrace();
})
// Collect errors and discard all returned rows
.ignoreElements()
.cast(Long.class)
.onErrorReturn(1L))
.sum()
.block();
System.out.println("Total failed queries: " + failed);
上記の例では、Flux.defer()
が、session.executeReactive()
への呼び出しをラップするために使用しています。
ドライバーは常に単一サブスクリプションのみのパブリッシャーを作成するため、これが必要です。
defer
オペレーターが用いられない場合、このような単一サブスクリプションのみをサポートするパブリッシャーは、retry
のようなオペレーターと互換性がありません。
それは、こうしたオペレーターが上流のパブリッシャーに複数回サブスクライブし、ドライバーが例外をスローする場合があるためです。
これが、それがまさにdefer
オペレーターが設計された目的です。defer
オペレーターへの各サブスクリプションは session.executeReactive()
への個別の呼び出しをトリガーし、セッションがクエリを再実行・再試行するたびに新しいパブリッシャーを返します。