はじめに
Couchbase Serverの分散トランザクションにおけるトランザクションライフサイクルは、基本的にRDB/SQLと同様のものとして理解することができますが、アプリケーション開発において利用するにあたって、独自の考慮点もあります。
-
Couchbase Serverにおけるトランザクションは、クラスターへの処理リクエスト実行レベルで選択することのできるオプションです。つまり、アプリケーションには、トランザクションアクターと非トランザクションアクターが存在します。
-
また、このことを別の視点から言い換えると、Couchbase Serverにおけるトランザクションは、SDK/クライアントが主体となって実現される機能であるといえます(中央集権的なプロセスは存在せず、そこから起因する性能影響もありません)。
トランザクションライフサイクル
トランザクションの初期化
Transactions
オブジェクトの構築により、バックグラウンドプロセスが自動的に実行され、スレッドプールを含むいくつかのリソースが使用されます。
そのため、アプリケーションでは、Transactions
オブジェクトがひとつだけ作成されるようにすることが非常に重要です。
// Initialize the Couchbase cluster
Cluster cluster = Cluster.connect("localhost", "username", "password");
Bucket bucket = cluster.bucket("travel-sample");
Scope scope = bucket.scope("inventory");
Collection collection = scope.collection("airport");
// Create the single Transactions object
Transactions transactions = Transactions.create(cluster);
複数のTransactions
オブジェクト利用ケース
通常、アプリケーションに必要なTransactions
オブジェクトは1つだけです。ライブラリーは、ふたつ以上のオブジェクトが作成されると警告を発します。
アプリケーションが複数のTransactions
オブジェクトを作成する必要があるというまれな例外がひとつあります。それは、カスタムメタデータコレクションを利用する場合です。
コミット
コミットは自動的に行われます。トランザクションロジックコールバックの最後にctx.commit()
の明示的な呼び出しがなく、例外がスローされない場合、コミットされます。
非同期APIでは、commit()
の明示的な呼び出しを省略した場合、メソッドチェーンの結果として、.then()
を呼び出して、必要なMono<Void>
タイプの戻り値に変換する必要がある場合があります。
Mono<TransactionResult> result = transactions.reactive().run((ctx) -> {
return ctx.get(collection.reactive(), "anotherDoc").flatMap(doc -> {
JsonObject content = doc.contentAs(JsonObject.class);
content.put("transactions", "are awesome");
return ctx.replace(doc, content);
}).then();
});
トランザクションがコミットされるとすぐに、そのトランザクションのすべての変更が他のトランザクションからの読み取りに対して、アトミックに表示されます。変更はコミット(または「アンステージング」)されるため、非トランザクションのアクターに結果整合性のある方法で表示されます。
コミットは最終的なものであり、トランザクションがコミットされた後は、ロールバックできず、それ以上の操作は許可されません。
トランザクションがコミットポイントに既に到達している場合、直後にアプリケーションがクラッシュした場合でも、非同期クリーンアッププロセスにより、トランザクションが事後的にコミットされます。
非トランザクション書き込みとの同時実行時の考慮
アプリケーションは、同じドキュメントに対して、非トランザクション書き込みがトランザクション書き込みと同時に実行されないようにする必要があります。
この要件は、Key-Value操作のパフォーマンスが損なわれないようにするためのものです。Couchbase Serverのトランザクションにおける重要な哲学は、「使用したものに対してのみ支払う」ということです。
もしも、このような2つ種類の書き込みが競合した場合、トランザクション書き込みは「勝ち」、非トランザクション書き込みを上書きします。
これは書き込みにのみ適用されることに注意してください。トランザクションと同時に行われる非トランザクション読み取りはすべて問題なく、リードコミットレベルの分離性でコントロールされます。
上記の協調に関する要件が満たされていることを検出するために、アプリケーションはクライアントのイベントロガーをサブスクライブし、次のようにIllegalDocumentState
イベントをチェックできます。
cluster.environment().eventBus().subscribe(event -> {
if (event instanceof IllegalDocumentState) {
// log this event for review
}
});
このイベントは、非トランザクション書き込みが、トランザクション書き込みにオーバーライドされたことが検出された場合に発生します。イベントには、アプリケーションのデバッグを支援するために、関連するドキュメントのキーが含まれています。
ロールバック
トランザクションラムダブロックから(トランザクションまたはアプリケーションライブラリーのいずれかによって)例外がスローされた場合、その試行はロールバックされます。トランザクションロジックは、例外に応じて再試行される場合とされない場合があります。
トランザクションが再試行されない場合、TransactionFailed
例外がスローされ、そのgetCause
メソッドを使用して失敗の詳細を確認できます。
アプリケーションはこの例外を使用して、ロールバックをトリガーした理由を次のように通知できます。
class BalanceInsufficient extends RuntimeException {
}
try {
transactions.run((ctx) -> {
TransactionGetResult customer = ctx.get(collection, "customer-name");
if (customer.contentAsObject().getInt("balance") < costOfItem) {
throw new BalanceInsufficient();
}
// else continue transaction
});
} catch (TransactionCommitAmbiguous e) {
// This exception can only be thrown at the commit point, after the
// BalanceInsufficient logic has been passed, so there is no need to
// check getCause here.
System.err.println("Transaction possibly committed");
for (LogDefer err : e.result().log().logs()) {
System.err.println(err.toString());
}
} catch (TransactionFailed e) {
if (e.getCause() instanceof BalanceInsufficient) {
// Re-raise the error
throw (RuntimeException) e.getCause();
} else {
System.err.println("Transaction did not reach commit point");
for (LogDefer err : e.result().log().logs()) {
System.err.println(err.toString());
}
}
}
トランザクションは明示的にロールバックすることもできます。
transactions.run((ctx) -> {
TransactionGetResult customer = ctx.get(collection, "customer-name");
if (customer.contentAsObject().getInt("balance") < costOfItem) {
ctx.rollback();
}
// else continue transaction
});
この場合、ctx.rollback()
に達すると、トランザクションは正常にロールバックされたと見なされ、TransactionFailed
例外はスローされません。
トランザクションがロールバックされた後は、コミットできず、それ以上の操作は許可されず、ライブラリはコードブロックの最後でトランザクションを自動的にコミットしようとしません。
関連する論点
非同期APIによる並行操作
非同期APIを使用すると、トランザクション内で操作を同時に実行できるため、パフォーマンスを向上させることができます。アプリケーションが従う必要のある2つのルールがあります。
-
最初のミューテーションは、単独で、シリアルに、実行する必要があります。これは、最初のミューテーションがトランザクションのためのメタデータの作成をトリガーするためです。
-
トランザクションライブラリーが、障害が発生した場合にロールバックする必要のある操作を追跡できるように、すべての同時操作は、完全に完了できる必要があります。つまり、アプリケーションはエラーを「飲み込む」必要がありますが、エラーが発生したことを記録し、並行操作の最後にエラーが発生した場合は、エラーをスローしてトランザクションを再試行します。
これらのルールの実例は、以下のコードに示されています:
List<String> docIds = Arrays.asList("doc1", "doc2", "doc3", "doc4", "doc5");
ReactiveCollection coll = collection.reactive();
TransactionResult result = transactions.reactive((ctx) -> {
// Tracks whether all operations were successful
AtomicBoolean allOpsSucceeded = new AtomicBoolean(true);
// The first mutation must be done in serial, as it also creates a metadata
// entry
return ctx.get(coll, docIds.get(0)).flatMap(doc -> {
JsonObject content = doc.contentAsObject();
content.put("value", "updated");
return ctx.replace(doc, content);
})
// Do all other docs in parallel
.thenMany(Flux.fromIterable(docIds.subList(1, docIds.size()))
.flatMap(docId -> ctx.get(coll, docId).flatMap(doc -> {
JsonObject content = doc.contentAsObject();
content.put("value", "updated");
return ctx.replace(doc, content);
}).onErrorResume(err -> {
allOpsSucceeded.set(false);
// App should replace this with logging
err.printStackTrace();
// Allow other ops to finish
return Mono.empty();
}),
// Run these in parallel
docIds.size())
// The commit or rollback must also be done in serial
).then(Mono.defer(() -> {
// Commit iff all ops succeeded
if (allOpsSucceeded.get()) {
return ctx.commit();
} else {
throw new RuntimeException("Retry the transaction");
}
}));
}).block();
カスタムメタデータコレクション
前述のように、トランザクションは自動的にメタデータドキュメントを作成して使用します。デフォルトでは、これらはトランザクションで最初に変更されたドキュメントのバケットのデフォルトコレクションに作成されます。オプションとして、デフォルト以外のコレクションを指定してメタデータドキュメントを保存できます。ほとんどのユーザーはこの機能を使用する必要はなく、デフォルトの動作を引き続き使用できます。これらは、次のユースケース向けに提供されています。
-
メタデータドキュメントには、各トランザクションに関係するドキュメントについて、ドキュメントのキーとバケットの名前、スコープ、およびそれが存在するコレクションが含まれています。これらに機密データが含まれ、コレクションレベルのアクセス制御を行う必要がある可能性があります。
-
デフォルトのコレクションを削除したい場合。実行する前に、デフォルトのコレクションでメタデータドキュメントを使用する既存のすべてのトランザクションが終了していることを確認する必要があります。
使用法
カスタムメタデータコレクションは、次の方法で有効になります。
Collection metadataCollection = null; // this is a Collection opened by your code earlier
Transactions transactions = Transactions.create(cluster,
TransactionConfigBuilder.create().metadataCollection(metadataCollection));
コレクションを指定した場合:
-
この
Transactions
オブジェクトから作成されたトランザクションはすべて、そのコレクションにメタデータを作成して使用します。 -
この
Transactions
オブジェクトによって開始された非同期クリーンアップは、このコレクションでのみ期限切れのトランザクションを検索します。
アプリケーションにRBACデータの読み取りおよび書き込み権限があることを確実する必要があります。また、既存のトランザクションに干渉する可能性があるため、コレクションを削除してはなりません。既存のコレクションを使用することも、新しいコレクションを作成することもできます。
複数のTransactions
オブジェクトを利用する正当性
通常、アプリケーションに必要なTransactions
オブジェクトは1つだけです。ただし、アプリケーションが複数のカスタムメタデータコレクションを持つ必要がある場合が考えられます。複数のTransactions
オブジェクトを作成することは、これを可能にする唯一の方法であるため、この場合は、複数のTransactions
オブジェクトを利用するのは妥当です。
ライブラリーは、このシナリオに基づいて作成されている複数のTransactions
オブジェクトについて警告しません。
遅延コミット
遅延コミット機能は現在アルファ版であり、APIは変更される可能性があります。
遅延コミットにより、コミットポイントの直前でトランザクションを一時停止できます。そのことにより、トランザクションを完了するために必要なすべてのものをコンテキストにバンドルして、文字列またはバイト配列にシリアル化し、他の場所(たとえば、別のプロセス)で逆シリアル化することができます。その後、トランザクションをコミットまたはロールバックできます。
この機能の背後にある目的は、複数のデータベースにまたがる可能性のある複数のトランザクションをコミットポイントの直前に移動し、すべてを一緒にコミットできるようにすることです。
最初のコミットを延期し、トランザクションをシリアル化する例を次に示します。
try {
TransactionResult result = transactions.run((ctx) -> {
JsonObject initial = JsonObject.create().put("val", 1);
ctx.insert(collection, "a-doc-id", initial);
// Defer means don't do a commit right now. `serialized` in the result will be
// present.
ctx.defer();
});
// Available because ctx.defer() was called
assert (result.serialized().isPresent());
TransactionSerializedContext serialized = result.serialized().get();
// This is going to store a serialized form of the transaction to pass around
byte[] encoded = serialized.encodeAsBytes();
} catch (TransactionFailed e) {
// System.err is used for example, log failures to your own logging system
System.err.println("Transaction did not reach commit point");
for (LogDefer err : e.result().log().logs()) {
System.err.println(err.toString());
}
}
そして、後でトランザクションをコミットします。
TransactionSerializedContext serialized = TransactionSerializedContext.createFrom(encoded);
try {
TransactionResult result = transactions.commit(serialized);
} catch (TransactionFailed e) {
// System.err is used for example, log failures to your own logging system
System.err.println("Transaction did not reach commit point");
for (LogDefer err : e.result().log().logs()) {
System.err.println(err.toString());
}
}
または、トランザクションをロールバックすることもできます。
TransactionSerializedContext serialized = TransactionSerializedContext.createFrom(encoded);
try {
TransactionResult result = transactions.rollback(serialized);
} catch (TransactionFailed e) {
// System.err is used for example, log failures to your own logging system
System.err.println("Transaction did not reach commit point");
for (LogDefer err : e.result().log().logs()) {
System.err.println(err.toString());
}
}
トランザクションの有効期限タイマーは、トランザクションが開始されるとスタートし、トランザクションが遅延状態にある間は一時停止されません。
参考情報
トランザクションのドキュメントには、Couchbaseでトランザクションがどのように機能するかについての説明がたくさんあります。
トランザクションの例のリポジトリで、さらにコード例を見つけることができます。