はじめに
本稿は、Couchbase Java SDKでの分散トランザクションにおけるエラー処理を理解するための情報を提供することを目的としています。
そのために、まずCouchbaseの分散トランザクションにおける中心となるコンセプトについての解説から始めます。
本記事の先行テキストとして以下をご参照ください。
Couchbase Java SDK解説:分散トランザクションプログラミング入門
コミットとロールバック
コミット
まず、Couchbaseの分散トランザクションでは、ctx.commit()
により、トランザクションのコミットを明示的にプログラミングすることができます。
また、Couchbaseの分散トランザクションでは、コミットは自動的に行われます。トランザクションロジックコールバック(ラムダブロック)の最後にctx.commit()
の明示的な呼び出しがなくても、例外がスローされない限り、コミットが実行されます。
トランザクションがコミットポイントに到達すると、アプリケーションがクラッシュした場合でも、トランザクションが完全にコミットされます。これは、非同期クリーンアッププロセスにより実現されます(非同期クリーンアッププロセスについては、別に解説します)。
トランザクションがコミットされるとすぐに、すべての変更が他のトランザクションからの読み取りにアトミックに表示されます。変更はコミットされるため(つまり、もはやステージングされていないため)、非トランザクションのアクターに結果整合性のある方法で表示されます。
当然ながら、トランザクションがコミットされた後は、ロールバックできません。
非同期API利用時の注意
非同期APIでは、commit
の明示的な呼び出しを省略した場合、下記のようにthen()
をコールすることにより、チェーンの結果として、戻り値のタイプをMono<Void>
に変換する必要がある場合があります。
Mono<TransactionResult> result = transactions.reactive().run((ctx) -> {
return ctx.get(collection.reactive(), "anotherDoc").flatMap(doc -> {
JsonObject content = doc.contentAs(JsonObject.class);
content.put("transactions", "are awesome");
return ctx.replace(doc, content);
}).then();
});
ロールバック
ラムダブロックから、トランザクションライブラリまたはアプリケーションロジックのいずれかによって例外がスローされた場合、その試行はロールバックされます。
トランザクションロジックは、例外の種類に応じて再試行される場合とされない場合があります。
トランザクションが再試行されない場合、TransactionFailed
例外がスローされ、そのgetCause
メソッドを使用して失敗の詳細を確認できます。
アプリケーションはこれを使用して、ロールバックをトリガーした理由を次のように通知できます。
class BalanceInsufficient extends RuntimeException {
}
try {
transactions.run((ctx) -> {
TransactionGetResult customer = ctx.get(collection, "customer-name");
if (customer.contentAsObject().getInt("balance") < costOfItem) {
throw new BalanceInsufficient();
}
// トランザクション処理の継続
});
} catch (TransactionCommitAmbiguous e) {
// この例外は、コミットポイントでのみスローされます。
System.err.println("Transaction possibly committed");
for (LogDefer err : e.result().log().logs()) {
System.err.println(err.toString());
}
} catch (TransactionFailed e) {
// この例外は、Balance Insufficientがスローされたことにより、発生した可能性があります。
if (e.getCause() instanceof BalanceInsufficient) {
// 例外の再スロー
throw (RuntimeException) e.getCause();
} else {
System.err.println("Transaction did not reach commit point");
for (LogDefer err : e.result().log().logs()) {
System.err.println(err.toString());
}
}
}
トランザクションは明示的にロールバックすることもできます。
transactions.run((ctx) -> {
TransactionGetResult customer = ctx.get(collection, "customer-name");
if (customer.contentAsObject().getInt("balance") < costOfItem) {
ctx.rollback();
}
// トランザクション処理の継続
});
この場合、ctx.rollback()
に達すると、トランザクションは正常にロールバックされたと見なされ、TransactionFailed
はスローされません。
トランザクションがロールバックされた後は、コミットできず、それ以上の操作は許可されず、ライブラリはコードブロックの最後でトランザクションを自動的にコミットしようとしません。
Couchbase固有のコンセプト
(アン)ステージング
トランザクションスコープ内では、変更されたドキュメントは、そのトランザクションスコープ外におけるドキュメントとは、別のものとして扱われます。これをステージングと呼びます。
そのトランザクションがコミットポイントに達した後、ステージングされたドキュメントは、そのトランザクションスコープ外のドキュメントに反映されます。これをアンステージングと呼びます。
エラー処理
概要
トランザクションライブラリでは、トランザクションの中で発生したエラーは例外としてアプリケーションに通知されます。
エラーには、次のようなものがあります。
- 存在しないドキュメントキーでドキュメントを取得
- すでに存在するドキュメントを挿入
- 存在しないドキュメントを削除または置換
- トランザクションラムダブロック内のアプリケーションロジックからスローされた例外
障害が発生すると、このトランザクションで試行された他のすべての操作は即座に失敗します。
トランザクションは、マルチステージおよびマルチドキュメントからなるため、部分的な成功/失敗の概念もあります。これは後で説明するTransactionResult.unstagingComplete()
でアプリケーションに通知されます。
例外クラス
トランザクションライブラリから、アプリケーションに通知される以下の3つの例外があります。
TransactionFailed
TransactionExpired
TransactionCommitAmbiguous
すべての例外は、TransactionFailed
から派生しています。
TransactionFailed
およびTransactionExpired
これらの例外が通知される時、そのトランザクションは間違いなくコミットポイントに到達していません。
TransactionExpired
は、有効期限に達するまで再試行が行われたことを示しますが、通常、アプリケーションにとってTransactionFailed
との区別は、重要ではなく、特にTransactionExpired
発生時に固有のロジック個別を用いる必要はありません。(そのため、下記のコードでは、TransactionExpired
クラスの親クラスであるTransactionFailed
のみをキャッチしています)
どちらの場合も、すべての変更をロールバックします。
TransactionCommitAmbiguous
トランザクションが期限切れになる前に、操作が成功したことが100%確実に分かるとは限りません。この場合、ライブラリはTransactionCommitAmbiguous
例外を発生させて、この状態を通知します。
実際上、この例外が発生することは稀だといえます。しかしながら、一時的なネットワーク障害その他の理由により、クライアントにおけるプロトコルとして、状態を判別できない状況が考慮された設計となっています。
この時、トランザクションがコミットポイントに到達した場合と到達しなかった場合、が考えられ、それぞれのケースについて、理解しておくことは重要です。
トランザクションが実際にコミットポイントに正常に到達した場合、トランザクションは将来のある時点で、(非同期で実行される)クリーンアッププロセスによって完全に完了されます。デフォルト設定では、これは通常1分以内ですが、TransactionCommitAmbiguous
が発生した原因によっては、さらに時間がかかる可能性もあります。
トランザクションが実際にコミットポイントに到達していなかった場合、非同期クリーンアッププロセスは、将来のある時点でトランザクションをロールバックしようとします。ロールバックが完了するまでの期間であっても、ドキュメントへの書き込みがデッドロックとならない(長期間ブロックしない)ようにする安全のためのメカニズムがあり、またトランザクションのためにステージングされたメタデータは、トランザクション管理以外の目的で利用されないため、問題は発生しません。
例外の対応
TransactionCommitAmbiguous
例外をキャッチした際には、あるいはアプリケーションによって同じ内容のトランザクションを再実行するというロジックが採用できる場合もあるかもしれませんが、TransactionFailed
同様に、こうしたエラーが発生する際には、アプリケーションロジックによって処理できる場合ばかりとは限りません。代替策として、原因調査のため、ログを書き込むことが考えられます。頻繁に同様な状況が発生している場合など、トランザクションの有効期限を延長して、プロトコル上の曖昧さを解決するため時間を延長することが考えられます。
トランザクション有効期限の変更
トランザクションの有効期限(デフォルトは15秒)を次の方法で延長することができます。
Transactions transactions = Transactions.create(cluster,
TransactionConfigBuilder.create().expirationTime(Duration.ofSeconds(120)).build());
これにより、プロトコルが一時的な障害(たとえば、クラスターのリバランスによって引き起こされる障害)を乗り越えるための時間を増やすことができます。有効期限が長い場合に考慮すべきトレードオフは、トランザクションによってステージングされたドキュメントが、有効期限を超えるまで、他のトランザクションからの変更に対してロックされることです。
有効期限が正確に守られるとは限らないことに注意してください。たとえば、アプリケーションがラムダ内で長いブロッキング操作を実行する場合(これは回避する必要があります)、有効期限はこれが終了した後にのみトリガーできます。また、トランザクションが有効期限に近い時点で、Key-Value操作を試行し、そのKey-Value操作がタイムアウトした場合にも、有効期限を超える可能性があります。
アンステージング完了の確認:TransactionResult.unstagingComplete()
トランザクションのコミットポイントはアトミックです。このアトミックコミットポイントに達した後に、ドキュメント自体は個別にコミットされます(これを「アンステージング」と呼びます)。
TransactionResult.unstagingComplete()
メソッドは、ステージング解除プロセスが正常に完了したかどうかを示します。これは、アプリケーションの中で、トランザクションの結果を、非トランザクションアクター(例えば、非トランザクションのN1QLやKey-Value読み取り)が利用する場合に使用する必要があります。
トランザクションを認識するアクターは、トランザクション後のバージョンのドキュメントを返します。したがって、アプリケーションがトランザクション対応のアクターのみを利用している場合、ステージング解除プロセスはオプションです。
包括的なエラー処理の例
エラー処理は、トランザクションのコミット(またはロールバック)のポイントの前か後かによって異なります。
上記で解説した内容を用いて、コードのサンプルを以下に示します。
try {
TransactionResult result = transactions.run((ctx) -> {
// ... transactional code here ...
});
// ここでは、トランザクションは確実にコミットポイントに到達しています。
// 個々のドキュメントのアンステージングは、完了している場合と完了していない場合があります
if (result.unstagingComplete()) {
// 非トランザクションアクターを使用した操作では、unstagingComplete()がtrueである必要があります。
// 下記で利用されている、「result.mutationState()」は、トランザクション内で、
// KV操作のみが行われている場合にのみ有効であることに注意してください。
// N1QL操作がトランザクション内で行われている場合には、利用することができません。
cluster.query(" ... N1QL ... ", QueryOptions.queryOptions().consistentWith(result.mutationState()));
String documentKey = "a document key involved in the transaction";
GetResult getResult = collection.get(documentKey);
} else {
// このステップは完全にアプリケーションに依存します。
// この時点でresult.unstagingComplete()がtrueであることが重要な場合は、
// 独自の例外をスローする必要がある場合があります。
// この時点で、ステージング解除が完了していなくても、非同期クリーンアッププロセスが、
// 後でステージング解除を完了します。
}
} catch (TransactionCommitAmbiguous err) {
// ここでは、トランザクションがコミットポイントに到達した場合と到達しなかった場合があります
System.err.println("Transaction returned TransactionCommitAmbiguous and" + " may have succeeded, logs:");
err.result().log().logs().forEach(log -> System.err.println(log.toString()));
} catch (TransactionFailed err) {
// トランザクションは間違いなくコミットポイントに到達しませんでした
System.err.println("Transaction failed with TransactionFailed, logs:");
err.result().log().logs().forEach(log -> System.err.println(log.toString()));
}