2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

DynamoDB TransactionsをGolangで実装してみる

Last updated at Posted at 2020-10-31

はじめに

もう、この記事のタイトルからして「トランザクション管理するならDynamoDBなんて使わないでRDB使いやがれ!」って言われそうなんだけど、必要になってしまったのだから仕方ない。

まあ、基本的な動作検証については以下の記事がよくまとまっているので、ほぼこれをなぞったような感じではあるのだけど……。ひとまずいくつか動かしてみたので、記録しておく。

【Qiita】DynamoDBでのトランザクションを実際に使って考えてみる。

DynamoDBの構成

ひとまず、やりたいことはGSIとかLSIはあまり関係ないのだけど、たまに「使わなかったら問題ないけど使ったら想定通りに動かない」⇒「調べてみたら制限事項でしたー!」なことがあるので、最初からGSIもLSIも定義して動かしてみる。

IdとNameで構成された入った普通のテーブルだ。
都合上、Idを同じにしてNameを別にするというデータ投入をしているが、あまり気にしないでもらいたい。

resource "aws_dynamodb_table" "test" {
  name           = local.dynamodb_table_name
  billing_mode   = "PROVISIONED"
  read_capacity  = 1
  write_capacity = 1
  hash_key       = "id"
  range_key      = "name"

  attribute {
    name = "id"
    type = "S"
  }

  attribute {
    name = "name"
    type = "S"
  }

  local_secondary_index {
    name            = "name_lsi"
    range_key       = "name"
    projection_type = "ALL"
  }

  global_secondary_index {
    name            = "name_gsi"
    hash_key        = "name"
    write_capacity  = 1
    read_capacity   = 1
    projection_type = "ALL"
  }
}

DynamoDB Transactionsの基本

基本的なところは例によってAWS公式の開発者ガイドを読むのが良い。

端折って書くと、RDBのトランザクションのように「ここからトランザクションを始めて、ここでCommit/Rollbackする」というような管理方法ではない。

TransactWriteItems という関数に渡したアイテムの範囲で、トランザクション管理をしてくれるという仕様だ。TransactWriteItems のパラメータである TransactWriteItemsInput が少し癖があるので、実際にこの後の例を見て、使いながら覚えていこう。

なお、↑に記載している DynamoDB のテーブルを Marshal/Unmarshal するために、

type item struct {
	Id   string `dynamodbav:"id"`
	Name string `dynamodbav:"name"`
}

という構造体を定義している。

基本の基本、まとめてPutItemをして成功させてみる

func test001() error {
	var (
		transactItems []*dynamodb.TransactWriteItem
	)

	items := []item{
		{Id: "00001", Name: "taro"},
		{Id: "00001", Name: "jiro"},
		{Id: "00001", Name: "saburo"},
		{Id: "00001", Name: "shiro"},
		{Id: "00001", Name: "goro"},
	}

	for _, item := range items {
		itemav, _ := dynamodbattribute.MarshalMap(item)
		transactItems = append(transactItems, &dynamodb.TransactWriteItem{
			Put: &dynamodb.Put{
				TableName: aws.String(ddbTableName),
				Item:      itemav,
			},
		})
	}

	if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
		TransactItems: transactItems,
	}); err != nil {
		return errors.New(fmt.Sprintf("Failed to TransactWriteItems: %s", err.Error()))
	}

	return nil
}

TransactWriteItemsInput は、TransactWriteItem の配列をインプットに持つため、これをまずは作成する。さらに、TransactWriteItem は、PutItem なら dynamodb.Put、UpdateItem なら dynamodb.Update、DeleteItem なら dynamodb.Delete の構造体(厳密には構造体のポインタ)を引数に必要とする。

PutItem の Item や Keys は、DynamoDbAttributes の形式であるため、構造体を MarshalMap したものを渡してあげよう。

実行すると、ちゃんとアイテムが Put された。

キャプチャ1.png

まあ、これは特に Transactions を使っても使わなくてもできることなので当たり前である。

まとめて PutItem しようとして途中で失敗したら……?

さて、では上記のパターンで途中で失敗したらどうなるだろうか?

前半ブロックでは3件のアイテムを Putして、後半ブロックでは前半に登録したアイテムを含む5件のアイテムを Put する。

func test002() error {
	var (
		transactItems1 []*dynamodb.TransactWriteItem
		transactItems2 []*dynamodb.TransactWriteItem
	)

	// 準備 3件 PutItem する
	items1 := []item{
		{Id: "00001", Name: "saburo"},
		{Id: "00001", Name: "shiro"},
		{Id: "00001", Name: "goro"},
	}

	for _, item := range items1 {
		itemav, _ := dynamodbattribute.MarshalMap(item)
		transactItems1 = append(transactItems1, &dynamodb.TransactWriteItem{
			Put: &dynamodb.Put{
				TableName: aws.String(ddbTableName),
				Item:      itemav,
			},
		})
	}

	if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
		TransactItems: transactItems1,
	}); err != nil {
		return errors.New(fmt.Sprintf("Failed to TransactWriteItems 1st: %s", err.Error()))
	}

	// 本番 5件 PutItem する
	// 3,4,5件目が重複するようにしてエラーを発生させる
	// ConditionExpression を指定しないと、重複したら上書きされる
	items2 := []item{
		{Id: "00001", Name: "taro"},
		{Id: "00001", Name: "jiro"},
		{Id: "00001", Name: "saburo"},
		{Id: "00001", Name: "shiro"},
		{Id: "00001", Name: "goro"},
	}

	for _, item := range items2 {
		itemav, _ := dynamodbattribute.MarshalMap(item)
		transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
			Put: &dynamodb.Put{
				TableName:           aws.String(ddbTableName),
				Item:                itemav,
				ConditionExpression: aws.String("attribute_not_exists(id) and attribute_not_exists(#name)"),
				ExpressionAttributeNames: map[string]*string{
					"#name": aws.String("name"),
				},
			},
		})
	}

	if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
		TransactItems: transactItems2,
	}); err != nil {
		return errors.New(fmt.Sprintf("Failed to TransactWriteItems 2nd: %s", err.Error()))
	}

	return nil
}

コメントに書いたように、

                ConditionExpression: aws.String("attribute_not_exists(id) and attribute_not_exists(#name)"),

これがないと、重複時に上書きされてエラーにならないので入れている。
name の前についている井桁はプレースホルダで、name は予約語とかぶってしまったので、次の行の

                ExpressionAttributeNames: map[string]*string{
                    "#name": aws.String("name"),
                },

でプレースホルダ化してかぶらないようにしている。

これを実行すると

TransactionCanceledException: Transaction cancelled, please refer cancellation reasons for specific reasons [None, None, ConditionalCheckFailed, ConditionalCheckFailed, ConditionalCheckFailed]
{
  RespMetadata: {
    StatusCode: 400,
    RequestID: "8I0KPTLCH68OF4MS27S0INPDTBVV4KQNSO5AEMVJF66Q9ASUAAJG"
  },
  CancellationReasons: [
    {
      Code: "None"
    },
    {
      Code: "None"
    },
    {
      Code: "ConditionalCheckFailed",
      Message: "The conditional request failed"
    },
    {
      Code: "ConditionalCheckFailed",
      Message: "The conditional request failed"
    },
    {
      Code: "ConditionalCheckFailed",
      Message: "The conditional request failed"
    }
  ],
  Message_: "Transaction cancelled, please refer cancellation reasons for specific reasons [None, None, ConditionalCheckFailed, ConditionalCheckFailed, ConditionalCheckFailed]"
}

なエラーが返却され、DynamoDB上でも

キャプチャ2.png

な感じで、ロールバックされていることが分かる。

DeleteItem → PutItemをまとめて行ってみる

DynamoDB はキー項目の更新ができないので、たまにやりたくなる操作がこれである。
キー項目更新しようとしてる時点で設計が悪いのでは……?というツッコミは抜きにしておこうか……。

func test003() error {
	var (
		transactItems1 []*dynamodb.TransactWriteItem
		transactItems2 []*dynamodb.TransactWriteItem
	)

	// 準備 5件 PutItem する
	items1 := []item{
		{Id: "00001", Name: "taro"},
		{Id: "00001", Name: "jiro"},
		{Id: "00001", Name: "saburo"},
		{Id: "00001", Name: "shiro"},
		{Id: "00001", Name: "goro"},
	}

	for _, item := range items1 {
		itemav, _ := dynamodbattribute.MarshalMap(item)
		transactItems1 = append(transactItems1, &dynamodb.TransactWriteItem{
			Put: &dynamodb.Put{
				TableName: aws.String(ddbTableName),
				Item:      itemav,
			},
		})
	}

	if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
		TransactItems: transactItems1,
	}); err != nil {
		return errors.New(fmt.Sprintf("Failed to TransactWriteItems 1st: %s", err.Error()))
	}

	// 本番 5件 DeleteItem → PutItem する
	items2 := []item{
		{Id: "00001", Name: "TARO"},
		{Id: "00001", Name: "JIRO"},
		{Id: "00001", Name: "SABURO"},
		{Id: "00001", Name: "SHIRO"},
		{Id: "00001", Name: "GORO"},
	}

	for _, item := range items1 {
		itemav, _ := dynamodbattribute.MarshalMap(item)
		transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
			Delete: &dynamodb.Delete{
				TableName: aws.String(ddbTableName),
				Key:       itemav,
			},
		})
	}
	for _, item := range items2 {
		itemav, _ := dynamodbattribute.MarshalMap(item)
		transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
			Put: &dynamodb.Put{
				TableName:           aws.String(ddbTableName),
				Item:                itemav,
				ConditionExpression: aws.String("attribute_not_exists(id) and attribute_not_exists(#name)"),
				ExpressionAttributeNames: map[string]*string{
					"#name": aws.String("name"),
				},
			},
		})
	}

	if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
		TransactItems: transactItems2,
	}); err != nil {
		return errors.New(fmt.Sprintf("Failed to TransactWriteItems 2nd: %s", err.Error()))
	}

	return nil
}

前半で items1 を PutItem し、後半で items1 を DeleteItem してから items2 を PutItem してる。

DeleteItem では Key 属性を全部渡す必要があるので、

            Delete: &dynamodb.Delete{
                TableName: aws.String(ddbTableName),
                Key:       itemav,
            },

とするのが基本の基本である。

これもエラーにならないパターンなので、当然ながら疑似的なキーの更新は成功する。

キャプチャ3.png

DeleteItem 後の PutItem が失敗したら?

トランザクション管理をしているのだから、DeleteItem も PutItem もまとめて戻ることを期待する。

test003 を修正し、以下の通り、PutItem の5件目でキーの重複が発生するように動作させる。

func test004() error {
	var (
		transactItems1 []*dynamodb.TransactWriteItem
		transactItems2 []*dynamodb.TransactWriteItem
	)

	// 準備 6件 PutItem する
	items1 := []item{
		{Id: "00001", Name: "taro"},
		{Id: "00001", Name: "jiro"},
		{Id: "00001", Name: "saburo"},
		{Id: "00001", Name: "shiro"},
		{Id: "00001", Name: "goro"},
		{Id: "00001", Name: "ROCK"},
	}

	for _, item := range items1 {
		itemav, _ := dynamodbattribute.MarshalMap(item)
		transactItems1 = append(transactItems1, &dynamodb.TransactWriteItem{
			Put: &dynamodb.Put{
				TableName: aws.String(ddbTableName),
				Item:      itemav,
			},
		})
	}

	if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
		TransactItems: transactItems1,
	}); err != nil {
		return errors.New(fmt.Sprintf("Failed to TransactWriteItems 1st: %s", err.Error()))
	}

	// 本番 5件 DeleteItem → PutItem する
	// DeleteItemを1件残して、PutItem の5件目が重複するようにしてエラーを発生させる
	// ConditionExpression を指定しないと、重複したら上書きされる
	items2 := []item{
		{Id: "00001", Name: "TARO"},
		{Id: "00001", Name: "JIRO"},
		{Id: "00001", Name: "SABURO"},
		{Id: "00001", Name: "SHIRO"},
		{Id: "00001", Name: "ROCK"},
	}

	for _, item := range items1[0:5] {
		itemav, _ := dynamodbattribute.MarshalMap(item)
		transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
			Delete: &dynamodb.Delete{
				TableName: aws.String(ddbTableName),
				Key:       itemav,
			},
		})
	}
	for _, item := range items2 {
		itemav, _ := dynamodbattribute.MarshalMap(item)
		transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
			Put: &dynamodb.Put{
				TableName:           aws.String(ddbTableName),
				Item:                itemav,
				ConditionExpression: aws.String("attribute_not_exists(id) and attribute_not_exists(#name)"),
				ExpressionAttributeNames: map[string]*string{
					"#name": aws.String("name"),
				},
			},
		})
	}

	if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
		TransactItems: transactItems2,
	}); err != nil {
		return errors.New(fmt.Sprintf("Failed to TransactWriteItems 2nd: %s", err.Error()))
	}

	return nil
}

これも、実行すると Exception が発生し、途中の DeleteItem, PutItem はすべてなかったことになり、

キャプチャ4.png

の更新前の状態に戻る。

ちなみに、これ、00001:ROCK のレコードも含んで DeleteItem してから PutItem したら、

ValidationException: Transaction request cannot include multiple operations on one item
        status code: 400, request id: F855HQ4G5DCQ3J0NNLQETAO3JVVV4KQNSO5AEMVJF66Q9ASUAAJG

のExceptionが発生した。

開発者ガイドにある通り、

同じ TransactWriteItems オペレーション内の複数のアクションが同じ項目をターゲットとしているために、トランザクション検証エラーが発生した場合。

の条件に該当するためだ。

キー項目をまとめて変更しようとしている場合は、注意しよう。
同じキーを DeleteItem → PutItem するのではなく、UpdateItem を混ぜる必要があるということだ。

UpdateItem も試してみよう

ではUpdateItemはどうなるか。

func test005() error {
	var (
		transactItems1 []*dynamodb.TransactWriteItem
		transactItems2 []*dynamodb.TransactWriteItem
	)

	// 準備 5件 PutItem する
	items := []item{
		{Id: "00001", Name: "taro"},
		{Id: "00001", Name: "jiro"},
		{Id: "00001", Name: "saburo"},
		{Id: "00001", Name: "shiro"},
		{Id: "00001", Name: "goro"},
	}

	for _, item := range items {
		itemav, _ := dynamodbattribute.MarshalMap(item)
		transactItems1 = append(transactItems1, &dynamodb.TransactWriteItem{
			Put: &dynamodb.Put{
				TableName: aws.String(ddbTableName),
				Item:      itemav,
			},
		})
	}

	if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
		TransactItems: transactItems1,
	}); err != nil {
		return errors.New(fmt.Sprintf("Failed to TransactWriteItems 1st: %s", err.Error()))
	}

	// 本番 5件 UpdateItem する(属性の追加)
	for _, item := range items {
		itemav, _ := dynamodbattribute.MarshalMap(item)
		transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
			Update: &dynamodb.Update{
				TableName:        aws.String(ddbTableName),
				Key:              itemav,
				UpdateExpression: aws.String("SET age = :age"),
				ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
					":age": {
						S: aws.String("20"),
					},
				},
			},
		})
	}

	if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
		TransactItems: transactItems2,
	}); err != nil {
		return errors.New(fmt.Sprintf("Failed to TransactWriteItems 2nd: %s", err.Error()))
	}

	return nil
}

この例では、一度 PutItem して作成したデータに UpdateItem で age の属性を付け加えている。
UpdateItemでも、キーは DeleteItem 同様に Key 属性に情報を渡そう。

                UpdateExpression: aws.String("SET age = :age"),

の部分は、RDB の SQL で言うところのSET句と同等と考えれば良い。

さて、これを動作させると、実行結果は

キャプチャ5.png

となり、属性の追加が成功した。

UpdateItem が途中で失敗したら?

↑の例で、UpdateItem するときにキーの不一致となる情報を入れてみた。

func test006() error {
	var (
		transactItems1 []*dynamodb.TransactWriteItem
		transactItems2 []*dynamodb.TransactWriteItem
	)

	// 準備 5件 PutItem する
	items1 := []item{
		{Id: "00001", Name: "taro"},
		{Id: "00001", Name: "jiro"},
		{Id: "00001", Name: "saburo"},
		{Id: "00001", Name: "shiro"},
		{Id: "00001", Name: "goro"},
	}

	for _, item := range items1 {
		itemav, _ := dynamodbattribute.MarshalMap(item)
		transactItems1 = append(transactItems1, &dynamodb.TransactWriteItem{
			Put: &dynamodb.Put{
				TableName: aws.String(ddbTableName),
				Item:      itemav,
			},
		})
	}

	if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
		TransactItems: transactItems1,
	}); err != nil {
		return errors.New(fmt.Sprintf("Failed to TransactWriteItems 1st: %s", err.Error()))
	}

	// 本番 5件 UpdateItem する(属性の追加)
	// 5件目に、準備で入れていないデータを指定してエラーを発生させる
	// ConditionExpression を設定しないと、Updateでもアイテムが挿入される
	items2 := []item{
		{Id: "00001", Name: "taro"},
		{Id: "00001", Name: "jiro"},
		{Id: "00001", Name: "saburo"},
		{Id: "00001", Name: "shiro"},
		{Id: "00001", Name: "rock"},
	}

	for _, item := range items2 {
		itemav, _ := dynamodbattribute.MarshalMap(item)
		transactItems2 = append(transactItems2, &dynamodb.TransactWriteItem{
			Update: &dynamodb.Update{
				TableName:           aws.String(ddbTableName),
				Key:                 itemav,
				ConditionExpression: aws.String("attribute_exists(id) and attribute_exists(#name)"),
				UpdateExpression:    aws.String("SET age = :age"),
				ExpressionAttributeNames: map[string]*string{
					"#name": aws.String("name"),
				},
				ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
					":age": {
						S: aws.String("20"),
					},
				},
			},
		})
	}

	if _, err := ddb.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
		TransactItems: transactItems2,
	}); err != nil {
		return errors.New(fmt.Sprintf("Failed to TransactWriteItems 2nd: %s", err.Error()))
	}

	return nil
}

もうお分かりだろうが、これを実行すると、Exception が発生し、

キャプチャ6.png

と、属性の追加はすべてロールバックされた。

結論

ということで、ちょっと癖があって注意しなければいけないケースはあるものの、良い感じにトランザクション処理が可能であることが分かった。

しかしこれ、LambdaやDynamoDBのバックエンドで動いているHW故障時とかもキレイに戻ってくれるものなのかな……。その辺は不安。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?