Amazon DynamoDB Transactions
公式サイトのsampleソースを整理してみた。
DynamoDB トランザクションの例
いつもなら公式Githubへのリンクや記載あるのですが、これ検索してもコードが見当たらなかったのがキッカケです。
準備
localstackを使ってローカル環境で動作確認します。
localstack環境については過去記事を参考にしてください。
LocalStack導入の準備
dynamodb-adminを利用してlocalstack環境のDynamoDBにアクセスできるようにします。ブラウザからGUI操作が可能になるのでとても便利です。docker-compose.ymlは以下のようになります。
version: '3.8'
services:
localstack:
container_name: localstack
image: localstack/localstack:latest
ports:
- 4566:4566
volumes:
- ./docker/localstack:/docker-entrypoint-initaws.d
- /var/run/docker.sock:/var/run/docker.sock
- 'localstack-data:/tmp/localstack'
environment:
- SERVICES=dynamodb
- AWS_ACCESS_KEY_ID=dummy
- AWS_SECRET_ACCESS_KEY=dummy
- AWS_DEFAULT_REGION=ap-northeast-1
- DATA_DIR=/tmp/localstack/data
dynamodb-admin:
image: aaronshaf/dynamodb-admin:latest
environment:
- DYNAMO_ENDPOINT=localstack:4566
- AWS_REGION=ap-northeast-1
ports:
- '8001:8001'
depends_on:
- localstack
volumes:
localstack-data:
準備が完了したらコンテナを起動します。
docker-compose up -d
Container Started
と表示されたらdynamodb-adminにアクセスします。
http://localhost:8001
トランザクションのオールオアナッシングオペレーション
DynamoDBが提供しているトランザクションはJavaで使用するトランザクションマネージャーのようなstart、commitの実装とは異なります。java sdk v2では必須ロジックである条件の設定(テーブルにキー項目が存在するかなど)、次に条件付きINSERT処理または条件付きUPDATE処理を実装、それら全ての条件を満たした場合のみトランザクションで管理しているCRUD操作をcommitするような作りになっています。条件を正しく設定しないとエラーとなってしまうため、やや扱いにくい印象です。公式サンプルソースでは下記の3つの条件を満たす必要があります。
1.顧客 ID が有効であることを確認します。
2.製品が IN_STOCK であることを確認し、製品のステータスを SOLD に更新します。
3.注文がまだ存在していないことを確認し、注文を作成します。
テーブルの作成
公式のサンプルを動作させるために3つのテーブルを作成します。
・「Customers」:keyは"CustomerId"
・「ProductCatalog」:keyは"ProductId"
・「Orders」:keyは"OrderId"
table-adminを使ってローカル環境のDynamoDBにCustomersテーブルを作成していきます。
Customersテーブル(顧客 ID が有効であることを確認)
1.青いボタン「Create table」をクリックします。
2.「Table Name」欄に「Customers」、「Hash Attribute Name」欄に「CustomerId」と入力して「Submit」ボタンをクリックします。(他の値はそのままで構いません)
3.「Customers」テーブルが作成されていることを確認します。
4.トランザクションの条件、
customerId が 09e8e9c8-ec48 に等しい顧客が顧客テーブルに存在することを確認
を満たす必要があるため、レコードを追加します。追加したテーブル「Customers」をクリックします。
5.右上にある「Create item」をクリックします。
6.json形式のレコード作成画面が表示されますので、key値の"09e8e9c8-ec48"を入力して「Save」をクリックします。
7.Saveが完了するとレコードが作成され「Delete」ボタンが表示されます。「Customers」をクリックして一覧画面に戻ります。
8.CustomerIdが"09e8e9c8-ec48"のレコードが追加されていることが分かります。
ProductCatalogテーブル(製品のステータスを SOLD に更新)
同じ要領でProductCatalogテーブルにレコードを追加します。更新の条件に、
製品ステータスが現在 IN_STOCK に設定されている条件が true の場合に、製品ステータスを SOLD に更新するアクションを定義します。
とあるので「ProductStatus」フィールドに"IN_STOCK"をもつレコードを追加します。
key値の指定はないので仮で"prd-001"を設定します。Customersテーブル作成時と同様の手順でProductCatalogテーブルを作成、以下のレコードを登録します。
Ordersテーブル(注文がまだ存在していないことを確認)
Ordersテーブルに関してはレコードが存在しないことが条件のため、テーブルを作成するだけで完了です。(レコードは不要)
Sampleソースの実装
冒頭でも記載しましたが
DynamoDB トランザクションの例
をベースにmainメソッドで完結するロジックを実装しています。コンソールログに「transaction success」と表示されたら成功です。
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.ConditionCheck;
import com.amazonaws.services.dynamodbv2.model.InternalServerErrorException;
import com.amazonaws.services.dynamodbv2.model.Put;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.ReturnConsumedCapacity;
import com.amazonaws.services.dynamodbv2.model.ReturnValuesOnConditionCheckFailure;
import com.amazonaws.services.dynamodbv2.model.TransactWriteItem;
import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsRequest;
import com.amazonaws.services.dynamodbv2.model.TransactionCanceledException;
import com.amazonaws.services.dynamodbv2.model.Update;
public class AwsApplication {
/**
* localstack + Dynamodb transaction
*/
public static void main(String[] args) {
// クライアント
AmazonDynamoDB client = AmazonDynamoDBClient.builder()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy")))
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4566", "ap-northeast-1"))
.build();
// Customersテーブル
final String CUSTOMER_TABLE_NAME = "Customers";
final String CUSTOMER_PARTITION_KEY = "CustomerId";
final String customerId = "09e8e9c8-ec48";
final HashMap<String, AttributeValue> customerItemKey = new HashMap<String, AttributeValue>();
customerItemKey.put(CUSTOMER_PARTITION_KEY, new AttributeValue(customerId));
// 条件チェック
ConditionCheck checkItem = new ConditionCheck()
.withTableName(CUSTOMER_TABLE_NAME)
.withKey(customerItemKey)
.withConditionExpression("attribute_exists(" + CUSTOMER_PARTITION_KEY + ")");
// ProductCatalogテーブル
final String PRODUCT_TABLE_NAME = "ProductCatalog";
final String PRODUCT_PARTITION_KEY = "ProductId";
final String productId = "prd-001";
HashMap<String, AttributeValue> productItemKey = new HashMap<String, AttributeValue>();
productItemKey.put(PRODUCT_PARTITION_KEY, new AttributeValue(productId));
Map<String, AttributeValue> expressionAttributeValues = new HashMap<String, AttributeValue>();
expressionAttributeValues.put(":new_status", new AttributeValue("SOLD")); // 更新後の値
expressionAttributeValues.put(":expected_status", new AttributeValue("IN_STOCK")); // 既存レコードの値
// 更新条件チェック
Update markItemSold = new Update()
.withTableName(PRODUCT_TABLE_NAME)
.withKey(productItemKey)
.withUpdateExpression("SET ProductStatus = :new_status")
.withExpressionAttributeValues(expressionAttributeValues)
.withConditionExpression("ProductStatus = :expected_status")
.withReturnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD);
// Ordersテーブル
final String ORDER_TABLE_NAME = "Orders";
final String ORDER_PARTITION_KEY = "OrderId";
final String orderId = "ord-001";
HashMap<String, AttributeValue> orderItem = new HashMap<String, AttributeValue>();
orderItem.put(ORDER_PARTITION_KEY, new AttributeValue(orderId));
orderItem.put(PRODUCT_PARTITION_KEY, new AttributeValue(productId));
orderItem.put(CUSTOMER_PARTITION_KEY, new AttributeValue(customerId));
orderItem.put("OrderStatus", new AttributeValue("CONFIRMED"));
orderItem.put("OrderTotal", new AttributeValue("100"));
// 新規登録条件チェック
Put createOrder = new Put()
.withTableName(ORDER_TABLE_NAME)
.withItem(orderItem)
.withReturnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD)
.withConditionExpression("attribute_not_exists(" + ORDER_PARTITION_KEY + ")");
Collection<TransactWriteItem> actions = Arrays.asList(
new TransactWriteItem().withConditionCheck(checkItem),
new TransactWriteItem().withPut(createOrder),
new TransactWriteItem().withUpdate(markItemSold));
TransactWriteItemsRequest placeOrderTransaction = new TransactWriteItemsRequest()
.withTransactItems(actions)
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
try {
// トランザクション実行
client.transactWriteItems(placeOrderTransaction);
System.out.println("transaction success");
} catch (ResourceNotFoundException e) {
System.err.println("テーブル定義エラー : " + e.getMessage());
} catch (InternalServerErrorException e) {
System.err.println("サーバーエラー : " + e.getMessage());
} catch (TransactionCanceledException e) {
// 条件式の 1 つの条件が満たされていません。
// TransactWriteItems リクエストのテーブルが別のアカウントまたはリージョンにあります。
// TransactWriteItems 操作の複数のアクションが同じアイテムを対象としています。
// トランザクションを完了するには、プロビジョニングされたキャパシティーが不十分です。
// 項目のサイズが大きくなりすぎる (400 KB を超える) か、
// ローカル セカンダリ インデックス (LSI) が大きくなりすぎるか、
// トランザクションによって行われた変更が原因で同様の検証エラーが発生します。
// 無効なデータ形式などのユーザー エラーがあります。
System.out.println("transaction error : " + e.getMessage());
}
}
}
TransactionCanceledExceptionから受け取るエラーメッセージを機械翻訳でコメントで記載しましたが、慣れないうちはExceptionばかり発生して結構大変でした。一度実装してイメージ出来るとDynamoDBのトランザクション機能を活用したエラーハンドリングなど、実装の幅が広がります。また、pom.xmlはsdk1のbomとDynamoDBだけ記載しておけばとりあえず動作します。一部抜粋して載せておきます。
<!-- BOM -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>1.12.353</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
</dependency>
</dependencies>