0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Couchbase Server Java SDK解説:分散トランザクションプログラミング入門

Last updated at Posted at 2021-07-15

概要

分散トランザクションによって、複数のドキュメントにまたがるドキュメントの変更(ミューテーション )、つまりインサート、リプレース、およびデリートに対して、一つトランザクションスコープ内で実行することができるようになります。トランザクションがコミットされるまで、これらのミューテーションは、他のトランザクション(Couchbaseデータプラットフォームの他の部分のどこにも)に反映されません。

Couchbaseの分散トランザクションは、次のような特徴を持ちます。

  • 開発者がトランザクションをロジックのブロックとして表現できる、高レベルで使いやすいAPI。
  • 高いパフォーマンスとスケーラビリティをサポートするための、リードコミットの分離レベル。
  • エラー処理(他のトランザクションとの競合に関するものを含む)

要件

製品バージョン

  • Couchbase Server6.6.1以降
  • Couchbase Javaクライアント3.1.5以降

前提

  • Couchbaseクラスターの全てのノードが同期するようにNTPが構成されている
  • アプリケーションが、予約されている拡張属性(XATTR)フィールドを使用していない。
  • バケットにドキュメントのレプリカが設定されている。

実行準備

Couchbaseトランザクションライブラリをプロジェクトに追加します。

Gradleの例:

dependencies {
    compile group: 'com.couchbase.client', name: 'couchbase-transactions', version: '1.1.8'
}

Mavenの例:

<dependency>
    <groupId>com.couchbase.client</groupId>
    <artifactId>couchbase-transactions</artifactId>
    <version>1.1.8</version>
</dependency>

これにより、Couchbase Java SDKを含め、このライブラリの推移的な依存関係が自動的に取り込まれます。

インポートするCouchbaseパッケージは次のとおりです。

import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.GetResult;
import com.couchbase.client.java.query.QueryOptions;
import com.couchbase.client.java.query.QueryProfile;
import com.couchbase.client.java.query.QueryResult;
import com.couchbase.client.tracing.opentelemetry.OpenTelemetryRequestSpan;
import com.couchbase.transactions.TransactionDurabilityLevel;
import com.couchbase.transactions.TransactionGetResult;
import com.couchbase.transactions.TransactionQueryOptions;
import com.couchbase.transactions.TransactionResult;
import com.couchbase.transactions.Transactions;
import com.couchbase.transactions.config.PerTransactionConfigBuilder;
import com.couchbase.transactions.config.TransactionConfigBuilder;
import com.couchbase.transactions.deferred.TransactionSerializedContext;
import com.couchbase.transactions.error.TransactionCommitAmbiguous;
import com.couchbase.transactions.error.TransactionFailed;
import com.couchbase.transactions.log.IllegalDocumentState;
import com.couchbase.transactions.log.LogDefer;
import com.couchbase.transactions.log.TransactionCleanupAttempt;
import com.couchbase.transactions.log.TransactionCleanupEndRunEvent;

以下のコードでは、下記のパッケージも利用しています。

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

import io.opentelemetry.api.trace.Span;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

コード例による解説

Transactionsオブジェクトの構築

アプリケーションは、通常Transactionsオブジェクトをただ一つだけ持ちます。

// Couchbaseクラスターの初期化
Cluster cluster = Cluster.connect("localhost", "Administrator", "password");
Bucket bucket = cluster.bucket("travel-sample");
Collection collection = bucket.defaultCollection();

// シングルTransactionsオブジェクトの作成
Transactions transactions = Transactions.create(cluster);

耐久性(Durability)設定

Transactionsオブジェクト作成時に、以下のようにオプションを指定することができます。

Transactions transactions = Transactions.create(cluster,
        TransactionConfigBuilder.create().durabilityLevel(TransactionDurabilityLevel.PERSIST_TO_MAJORITY)
                .logOnFailure(true, Event.Severity.WARN)
                .build());

デフォルト構成では、Majority耐久性設定を使用してすべての変更(ミューテーション)が実行されます。つまり、トランザクションが続行される前に、書き込みが過半数のレプリカに対してメモリレベルで実行されます。

上記の例では、TransactionDurabilityLevel.PERSIST_TO_MAJORITYによって、すべてのミューテーションが、トランザクションが続行される前に、アクティブノードとレプリカノードの過半数に対して、物理ストレージに書き込まれます。これにより、レイテンシは高くなりますが、永続化に関する高い信頼性要件を実現することができます。

Couchbaseの永続化オプションには、Noneのレベルも存在しますが(この場合、アクティブノードのメモリ上への反映が行われた後、レプリカへの反映や物理ストレージへの反映が行われる前に、処理が続行されます)、トランザクション操作においては推奨されておらず、サポートされていません。耐久性がNoneに設定されている場合、トランザクションの原子性は保証されません。

トランザクション・プログラミングの基本

トランザクション・プログラミングの基本的な考え方としては、アプリケーションがラムダブロックとしてロジックを提供し、トランザクションライブラリがトランザクションのコミットを処理します。

ここで、ラムダは、複数回実行される可能性があることを理解しておく必要があります(別のトランザクションとの一時的な競合など、一部のタイプの一時的なエラーを処理するため)。

Couchbase Javaクライアントと同様に、トランザクションライブラリは、同期と非同期のどちらのモードでも使用できます。

同期モード

try {
    transactions.run((ctx) -> {
        // 'ctx'はAttemptContextオブジェクトです。

        // ... トランザクションロジックは、このラムダブロック内に記述されます ...

        // この'ctx.commit()'メソッドの呼び出しはオプションです。
        // この記述が省略されていても、ラムダを抜ける際に、コミットされます。
        ctx.commit();
    });
} catch (TransactionCommitAmbiguous e) {
    // 単純化のため、標準エラー出力にログを出力
    System.err.println("Transaction possibly committed");

    for (LogDefer err : e.result().log().logs()) {
        System.err.println(err.toString());
    }
} catch (TransactionFailed e) {
    System.err.println("Transaction did not reach commit point");

    for (LogDefer err : e.result().log().logs()) {
        System.err.println(err.toString());
    }
}

同期モードの方が、記述・理解が簡単ですが、非同期APIを使用すると、スレッドプールを使用せずに、リアクティブスタイルでアプリケーションを構築できます。これにより、リソース利用効率が高まり、よりスケーリングできます。

非同期モード(Reactorライブラリを使用)

Mono<TransactionResult> result = transactions.reactive().run((ctx) -> {
    // 'ctx'はAttemptContextオブジェクトです。providing asynchronous versions of the
    // AttemptContext methods

    return

    // トランザクションロジックは、このラムダブロック内に記述されます。: 例として、'get'と'remove'を利用
    ctx.get(collection.reactive(), "document-id").flatMap(doc -> ctx.remove(doc))
            // この'ctx.commit()'メソッドの呼び出しはオプションです。
                // この記述が省略されていても、ラムダを抜ける際に、コミットされます。
            .then(ctx.commit());
}).doOnError(err -> {
    if (err instanceof TransactionCommitAmbiguous) {
        // 単純化のため、標準エラー出力にログを出力
        System.err.println("Transaction possibly committed: ");
    } else {
        System.err.println("Transaction failed: ");
    }

    for (LogDefer e : ((TransactionFailed) err).result().log().logs()) {
        System.err.println(err.toString());
    }
});

// 通常、さらに非同期で処理を連鎖させます。
// 単純化のため、ここで結果をブロックしています
TransactionResult finalResult = result.block();

リアクティブプログラミング一般に関係する注意点

一部のAttemptContextReactiveメソッド、 例えば、remove、ではMono<Void>が返されます。
これは、次のイベントをトリガーするのではなく、完了イベントをトリガーします。そのため、ここではthenを使用することになります。flatMapまたはそれに類したものを使用するのではないことに注意してください。

サンプルコード

主なトランザクション操作の簡単な要約を、コメントを交えたサンプルコードとして、示します。

同期モード

try {
    TransactionResult result = transactions.run((ctx) -> {
        // ドキュメント(doc-a)をデータベースにインサート
        ctx.insert(collection, "doc-a", JsonObject.create());

        // ドキュメント(doc-a)をデータベースから取得
        // ドキュメントが存在するかどうか不明な場合は、ctx.getOptionalを使用
        Optional<TransactionGetResult> docOpt = ctx.getOptional(collection, "doc-a");

        // ドキュメントが必ず存在する場合は、ctx.getを使用
        // ドキュメントが存在しない場合、トランザクションが失敗します
        TransactionGetResult docA = ctx.get(collection, "doc-a");

        // ドキュメント(doc-b)を取得し、値を追加した後、
        // 変更されたドキュメントで、データベース内のドキュメントをリプレイス
        TransactionGetResult docB = ctx.get(collection, "doc-b");
        JsonObject content = docB.contentAs(JsonObject.class);
        content.put("transactions", "are awesome");
        ctx.replace(docB, content);

        // ドキュメント(doc-c)の取得と削除
        TransactionGetResult docC = ctx.get(collection, "doc-c");
        ctx.remove(docC);

        ctx.commit();
    });
} catch (TransactionCommitAmbiguous e) {
    System.err.println("Transaction possibly committed");

    for (LogDefer err : e.result().log().logs()) {
        System.err.println(err.toString());
    }
} catch (TransactionFailed e) {
    System.err.println("Transaction did not reach commit point");

    for (LogDefer err : e.result().log().logs()) {
        System.err.println(err.toString());
    }
}

非同期モード

Mono<TransactionResult> result = transactions.reactive().run((ctx) -> {
    return
    // ドキュメント(doc-a)をデータベースにインサート
    ctx.insert(collection.reactive(), "doc-a", JsonObject.create())

            // ドキュメント(doc-b)を取得し、値を追加した後、
            // 変更されたドキュメントで、データベース内のドキュメントをリプレイス
            .then(ctx.get(collection.reactive(), "doc-b")).flatMap(docB -> {
                JsonObject content = docB.contentAs(JsonObject.class);
                content.put("transactions", "are awesome");
                return ctx.replace(docB, content);
            })

            // ドキュメント(doc-c)の取得と削除
            .then(ctx.get(collection.reactive(), "doc-c")).flatMap(doc -> ctx.remove(doc))

            // コミット
            .then(ctx.commit());

}).doOnError(err -> {
    if (err instanceof TransactionCommitAmbiguous) {
        System.err.println("Transaction possibly committed: ");
    } else {
        System.err.println("Transaction failed: ");
    }

    for (LogDefer e : ((TransactionFailed) err).result().log().logs()) {
        // 単純化のため、標準エラー出力にログを出力
        System.err.println(err.toString());
    }
});

// 通常、さらに非同期で処理を連鎖させます。
// 単純化のため、ここで結果をブロックしています
result.block();

最後に

今回は、Couchbase分散トランザクションプログラミングの入門編として、サンプルコードを、コメントを交えて、紹介するところまで行いました。
別の機会に、より詳細な解説を発表したいと思います。

参考情報

Couchbase公式ドキュメント Java SDK / Advanced Data Operations / Distributed ACID Transactions

Javadoc couchbase-transactions API

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?