はじめに
もう、この記事のタイトルからして「トランザクション管理するならDynamoDBなんて使わないでRDB使いやがれ!」って言われそうなんだけど、必要になってしまったのだから仕方ない。
まあ、基本的な動作検証については以下の記事がよくまとまっているので、ほぼこれをなぞったような感じではあるのだけど……。ひとまずいくつか動かしてみたので、記録しておく。
【Qiita】DynamoDBでのトランザクションを実際に使って考えてみる。
DynamoDBの構成
ひとまず、やりたいことはGSIとかLSIはあまり関係ないのだけど、たまに「使わなかったら問題ないけど使ったら想定通りに動かない」⇒「調べてみたら制限事項でしたー!」なことがあるので、最初からGSIもLSIも定義して動かしてみる。
IdとNameで構成された入った普通のテーブルだ。
都合上、Idを同じにしてNameを別にするというデータ投入をしているが、あまり気にしないでもらいたい。
resource "aws_dynamodb_table" "test" {
name = local.dynamodb_table_name
billing_mode = "PROVISIONED"
read_capacity = 1
write_capacity = 1
hash_key = "id"
range_key = "name"
attribute {
name = "id"
type = "S"
}
attribute {
name = "name"
type = "S"
}
local_secondary_index {
name = "name_lsi"
range_key = "name"
projection_type = "ALL"
}
global_secondary_index {
name = "name_gsi"
hash_key = "name"
write_capacity = 1
read_capacity = 1
projection_type = "ALL"
}
}
DynamoDB Transactionsの基本
基本的なところは例によってAWS公式の開発者ガイドを読むのが良い。
端折って書くと、RDBのトランザクションのように「ここからトランザクションを始めて、ここでCommit/Rollbackする」というような管理方法ではない。
TransactWriteItems
という関数に渡したアイテムの範囲で、トランザクション管理をしてくれるという仕様だ。TransactWriteItems
のパラメータである TransactWriteItemsInput
が少し癖があるので、実際にこの後の例を見て、使いながら覚えていこう。
なお、↑に記載している DynamoDB のテーブルを Marshal/Unmarshal するために、
type item struct {
Id string `dynamodbav:"id"`
Name string `dynamodbav:"name"`
}
という構造体を定義している。
基本の基本、まとめてPutItemをして成功させてみる
func test001() error {
var (
transactItems []*dynamodb.TransactWriteItem
)
items := []item{
{Id: "00001", Name: "taro"},
{Id: "00001", Name: "jiro"},
{Id: "00001", Name: "saburo"},
{Id: "00001", Name: "shiro"},
{Id: "00001", Name: "goro"},
}
for _, item := range items {
itemav, _ := dynamodbattribute.MarshalMap(item)
transactItems = append(transactItems, &dynamodb.TransactWriteItem{
Put: &dynamodb.Put{
TableName: aws.String(ddbTableName),
Item: itemav,
},
})
}
if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
TransactItems: transactItems,
}); err != nil {
return errors.New(fmt.Sprintf("Failed to TransactWriteItems: %s", err.Error()))
}
return nil
}
TransactWriteItemsInput
は、TransactWriteItem
の配列をインプットに持つため、これをまずは作成する。さらに、TransactWriteItem
は、PutItem なら dynamodb.Put
、UpdateItem なら dynamodb.Update
、DeleteItem なら dynamodb.Delete
の構造体(厳密には構造体のポインタ)を引数に必要とする。
PutItem の Item や Keys は、DynamoDbAttributes
の形式であるため、構造体を MarshalMap
したものを渡してあげよう。
実行すると、ちゃんとアイテムが Put された。
まあ、これは特に Transactions を使っても使わなくてもできることなので当たり前である。
まとめて PutItem しようとして途中で失敗したら……?
さて、では上記のパターンで途中で失敗したらどうなるだろうか?
前半ブロックでは3件のアイテムを Putして、後半ブロックでは前半に登録したアイテムを含む5件のアイテムを Put する。
func test002() error {
var (
transactItems1 []*dynamodb.TransactWriteItem
transactItems2 []*dynamodb.TransactWriteItem
)
// 準備 3件 PutItem する
items1 := []item{
{Id: "00001", Name: "saburo"},
{Id: "00001", Name: "shiro"},
{Id: "00001", Name: "goro"},
}
for _, item := range items1 {
itemav, _ := dynamodbattribute.MarshalMap(item)
transactItems1 = append(transactItems1, &dynamodb.TransactWriteItem{
Put: &dynamodb.Put{
TableName: aws.String(ddbTableName),
Item: itemav,
},
})
}
if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
TransactItems: transactItems1,
}); err != nil {
return errors.New(fmt.Sprintf("Failed to TransactWriteItems 1st: %s", err.Error()))
}
// 本番 5件 PutItem する
// 3,4,5件目が重複するようにしてエラーを発生させる
// ConditionExpression を指定しないと、重複したら上書きされる
items2 := []item{
{Id: "00001", Name: "taro"},
{Id: "00001", Name: "jiro"},
{Id: "00001", Name: "saburo"},
{Id: "00001", Name: "shiro"},
{Id: "00001", Name: "goro"},
}
for _, item := range items2 {
itemav, _ := dynamodbattribute.MarshalMap(item)
transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
Put: &dynamodb.Put{
TableName: aws.String(ddbTableName),
Item: itemav,
ConditionExpression: aws.String("attribute_not_exists(id) and attribute_not_exists(#name)"),
ExpressionAttributeNames: map[string]*string{
"#name": aws.String("name"),
},
},
})
}
if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
TransactItems: transactItems2,
}); err != nil {
return errors.New(fmt.Sprintf("Failed to TransactWriteItems 2nd: %s", err.Error()))
}
return nil
}
コメントに書いたように、
ConditionExpression: aws.String("attribute_not_exists(id) and attribute_not_exists(#name)"),
これがないと、重複時に上書きされてエラーにならないので入れている。
name
の前についている井桁はプレースホルダで、name
は予約語とかぶってしまったので、次の行の
ExpressionAttributeNames: map[string]*string{
"#name": aws.String("name"),
},
でプレースホルダ化してかぶらないようにしている。
これを実行すると
TransactionCanceledException: Transaction cancelled, please refer cancellation reasons for specific reasons [None, None, ConditionalCheckFailed, ConditionalCheckFailed, ConditionalCheckFailed]
{
RespMetadata: {
StatusCode: 400,
RequestID: "8I0KPTLCH68OF4MS27S0INPDTBVV4KQNSO5AEMVJF66Q9ASUAAJG"
},
CancellationReasons: [
{
Code: "None"
},
{
Code: "None"
},
{
Code: "ConditionalCheckFailed",
Message: "The conditional request failed"
},
{
Code: "ConditionalCheckFailed",
Message: "The conditional request failed"
},
{
Code: "ConditionalCheckFailed",
Message: "The conditional request failed"
}
],
Message_: "Transaction cancelled, please refer cancellation reasons for specific reasons [None, None, ConditionalCheckFailed, ConditionalCheckFailed, ConditionalCheckFailed]"
}
なエラーが返却され、DynamoDB上でも
な感じで、ロールバックされていることが分かる。
DeleteItem → PutItemをまとめて行ってみる
DynamoDB はキー項目の更新ができないので、たまにやりたくなる操作がこれである。
キー項目更新しようとしてる時点で設計が悪いのでは……?というツッコミは抜きにしておこうか……。
func test003() error {
var (
transactItems1 []*dynamodb.TransactWriteItem
transactItems2 []*dynamodb.TransactWriteItem
)
// 準備 5件 PutItem する
items1 := []item{
{Id: "00001", Name: "taro"},
{Id: "00001", Name: "jiro"},
{Id: "00001", Name: "saburo"},
{Id: "00001", Name: "shiro"},
{Id: "00001", Name: "goro"},
}
for _, item := range items1 {
itemav, _ := dynamodbattribute.MarshalMap(item)
transactItems1 = append(transactItems1, &dynamodb.TransactWriteItem{
Put: &dynamodb.Put{
TableName: aws.String(ddbTableName),
Item: itemav,
},
})
}
if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
TransactItems: transactItems1,
}); err != nil {
return errors.New(fmt.Sprintf("Failed to TransactWriteItems 1st: %s", err.Error()))
}
// 本番 5件 DeleteItem → PutItem する
items2 := []item{
{Id: "00001", Name: "TARO"},
{Id: "00001", Name: "JIRO"},
{Id: "00001", Name: "SABURO"},
{Id: "00001", Name: "SHIRO"},
{Id: "00001", Name: "GORO"},
}
for _, item := range items1 {
itemav, _ := dynamodbattribute.MarshalMap(item)
transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
Delete: &dynamodb.Delete{
TableName: aws.String(ddbTableName),
Key: itemav,
},
})
}
for _, item := range items2 {
itemav, _ := dynamodbattribute.MarshalMap(item)
transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
Put: &dynamodb.Put{
TableName: aws.String(ddbTableName),
Item: itemav,
ConditionExpression: aws.String("attribute_not_exists(id) and attribute_not_exists(#name)"),
ExpressionAttributeNames: map[string]*string{
"#name": aws.String("name"),
},
},
})
}
if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
TransactItems: transactItems2,
}); err != nil {
return errors.New(fmt.Sprintf("Failed to TransactWriteItems 2nd: %s", err.Error()))
}
return nil
}
前半で items1 を PutItem し、後半で items1 を DeleteItem してから items2 を PutItem してる。
DeleteItem では Key 属性を全部渡す必要があるので、
Delete: &dynamodb.Delete{
TableName: aws.String(ddbTableName),
Key: itemav,
},
とするのが基本の基本である。
これもエラーにならないパターンなので、当然ながら疑似的なキーの更新は成功する。
DeleteItem 後の PutItem が失敗したら?
トランザクション管理をしているのだから、DeleteItem も PutItem もまとめて戻ることを期待する。
test003
を修正し、以下の通り、PutItem の5件目でキーの重複が発生するように動作させる。
func test004() error {
var (
transactItems1 []*dynamodb.TransactWriteItem
transactItems2 []*dynamodb.TransactWriteItem
)
// 準備 6件 PutItem する
items1 := []item{
{Id: "00001", Name: "taro"},
{Id: "00001", Name: "jiro"},
{Id: "00001", Name: "saburo"},
{Id: "00001", Name: "shiro"},
{Id: "00001", Name: "goro"},
{Id: "00001", Name: "ROCK"},
}
for _, item := range items1 {
itemav, _ := dynamodbattribute.MarshalMap(item)
transactItems1 = append(transactItems1, &dynamodb.TransactWriteItem{
Put: &dynamodb.Put{
TableName: aws.String(ddbTableName),
Item: itemav,
},
})
}
if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
TransactItems: transactItems1,
}); err != nil {
return errors.New(fmt.Sprintf("Failed to TransactWriteItems 1st: %s", err.Error()))
}
// 本番 5件 DeleteItem → PutItem する
// DeleteItemを1件残して、PutItem の5件目が重複するようにしてエラーを発生させる
// ConditionExpression を指定しないと、重複したら上書きされる
items2 := []item{
{Id: "00001", Name: "TARO"},
{Id: "00001", Name: "JIRO"},
{Id: "00001", Name: "SABURO"},
{Id: "00001", Name: "SHIRO"},
{Id: "00001", Name: "ROCK"},
}
for _, item := range items1[0:5] {
itemav, _ := dynamodbattribute.MarshalMap(item)
transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
Delete: &dynamodb.Delete{
TableName: aws.String(ddbTableName),
Key: itemav,
},
})
}
for _, item := range items2 {
itemav, _ := dynamodbattribute.MarshalMap(item)
transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
Put: &dynamodb.Put{
TableName: aws.String(ddbTableName),
Item: itemav,
ConditionExpression: aws.String("attribute_not_exists(id) and attribute_not_exists(#name)"),
ExpressionAttributeNames: map[string]*string{
"#name": aws.String("name"),
},
},
})
}
if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
TransactItems: transactItems2,
}); err != nil {
return errors.New(fmt.Sprintf("Failed to TransactWriteItems 2nd: %s", err.Error()))
}
return nil
}
これも、実行すると Exception が発生し、途中の DeleteItem, PutItem はすべてなかったことになり、
の更新前の状態に戻る。
ちなみに、これ、00001:ROCK
のレコードも含んで DeleteItem してから PutItem したら、
ValidationException: Transaction request cannot include multiple operations on one item
status code: 400, request id: F855HQ4G5DCQ3J0NNLQETAO3JVVV4KQNSO5AEMVJF66Q9ASUAAJG
のExceptionが発生した。
開発者ガイドにある通り、
同じ TransactWriteItems オペレーション内の複数のアクションが同じ項目をターゲットとしているために、トランザクション検証エラーが発生した場合。
の条件に該当するためだ。
キー項目をまとめて変更しようとしている場合は、注意しよう。
同じキーを DeleteItem → PutItem するのではなく、UpdateItem を混ぜる必要があるということだ。
UpdateItem も試してみよう
ではUpdateItemはどうなるか。
func test005() error {
var (
transactItems1 []*dynamodb.TransactWriteItem
transactItems2 []*dynamodb.TransactWriteItem
)
// 準備 5件 PutItem する
items := []item{
{Id: "00001", Name: "taro"},
{Id: "00001", Name: "jiro"},
{Id: "00001", Name: "saburo"},
{Id: "00001", Name: "shiro"},
{Id: "00001", Name: "goro"},
}
for _, item := range items {
itemav, _ := dynamodbattribute.MarshalMap(item)
transactItems1 = append(transactItems1, &dynamodb.TransactWriteItem{
Put: &dynamodb.Put{
TableName: aws.String(ddbTableName),
Item: itemav,
},
})
}
if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
TransactItems: transactItems1,
}); err != nil {
return errors.New(fmt.Sprintf("Failed to TransactWriteItems 1st: %s", err.Error()))
}
// 本番 5件 UpdateItem する(属性の追加)
for _, item := range items {
itemav, _ := dynamodbattribute.MarshalMap(item)
transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
Update: &dynamodb.Update{
TableName: aws.String(ddbTableName),
Key: itemav,
UpdateExpression: aws.String("SET age = :age"),
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":age": {
S: aws.String("20"),
},
},
},
})
}
if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
TransactItems: transactItems2,
}); err != nil {
return errors.New(fmt.Sprintf("Failed to TransactWriteItems 2nd: %s", err.Error()))
}
return nil
}
この例では、一度 PutItem して作成したデータに UpdateItem で age の属性を付け加えている。
UpdateItemでも、キーは DeleteItem 同様に Key
属性に情報を渡そう。
UpdateExpression: aws.String("SET age = :age"),
の部分は、RDB の SQL で言うところのSET句と同等と考えれば良い。
さて、これを動作させると、実行結果は
となり、属性の追加が成功した。
UpdateItem が途中で失敗したら?
↑の例で、UpdateItem するときにキーの不一致となる情報を入れてみた。
func test006() error {
var (
transactItems1 []*dynamodb.TransactWriteItem
transactItems2 []*dynamodb.TransactWriteItem
)
// 準備 5件 PutItem する
items1 := []item{
{Id: "00001", Name: "taro"},
{Id: "00001", Name: "jiro"},
{Id: "00001", Name: "saburo"},
{Id: "00001", Name: "shiro"},
{Id: "00001", Name: "goro"},
}
for _, item := range items1 {
itemav, _ := dynamodbattribute.MarshalMap(item)
transactItems1 = append(transactItems1, &dynamodb.TransactWriteItem{
Put: &dynamodb.Put{
TableName: aws.String(ddbTableName),
Item: itemav,
},
})
}
if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
TransactItems: transactItems1,
}); err != nil {
return errors.New(fmt.Sprintf("Failed to TransactWriteItems 1st: %s", err.Error()))
}
// 本番 5件 UpdateItem する(属性の追加)
// 5件目に、準備で入れていないデータを指定してエラーを発生させる
// ConditionExpression を設定しないと、Updateでもアイテムが挿入される
items2 := []item{
{Id: "00001", Name: "taro"},
{Id: "00001", Name: "jiro"},
{Id: "00001", Name: "saburo"},
{Id: "00001", Name: "shiro"},
{Id: "00001", Name: "rock"},
}
for _, item := range items2 {
itemav, _ := dynamodbattribute.MarshalMap(item)
transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
Update: &dynamodb.Update{
TableName: aws.String(ddbTableName),
Key: itemav,
ConditionExpression: aws.String("attribute_exists(id) and attribute_exists(#name)"),
UpdateExpression: aws.String("SET age = :age"),
ExpressionAttributeNames: map[string]*string{
"#name": aws.String("name"),
},
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":age": {
S: aws.String("20"),
},
},
},
})
}
if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
TransactItems: transactItems2,
}); err != nil {
return errors.New(fmt.Sprintf("Failed to TransactWriteItems 2nd: %s", err.Error()))
}
return nil
}
もうお分かりだろうが、これを実行すると、Exception が発生し、
と、属性の追加はすべてロールバックされた。
結論
ということで、ちょっと癖があって注意しなければいけないケースはあるものの、良い感じにトランザクション処理が可能であることが分かった。
しかしこれ、LambdaやDynamoDBのバックエンドで動いているHW故障時とかもキレイに戻ってくれるものなのかな……。その辺は不安。