LoginSignup
16

More than 3 years have passed since last update.

posted at

updated at

DynamoDBでのトランザクションを実際に使って考えてみる。

背景

AWS使ってサーバーレスで自分用の家計簿的なwebサービスを勉強も兼ねて開発中。消費情報を登録する部分を作り、最近それに加えて残金管理を出来るようにした。
そうなると1つの消費情報登録で、複数のテーブルに対して更新を行う事になる。整合性を保つために、トランザクション制御を行いたい。

防ぎたいケース

  • 片方のテーブル更新後、別テーブルの更新に失敗による不整合データ発生
  • 複数人同時処理時、更新前データ取得による不正値更新(自分しか使わないけど)

DynamoDBの特性復習

DynamoDBのトランザクション機能概要

各種サンプルページ、紹介ページから以下の機能が読み取れる。

  • TransactWriteItems メソッドで複数更新処理のトランザクション実行が可能
  • TransactGetItemsRequest メソッドで複数読み込み処理がトランザクションとして可能(ある瞬間の複数テーブルの情報が必要な時に使うと思われ)
  • トランザクション処理の中で、ConditionCheck 処理を加える事が出来る様子。
  • 各処理で、withConditionExpression オプション(?)により、実行する条件を指定する事が出来る。

前準備

Local DynamoDB導入

色々試すので、課金発生しない様にローカルDynamoDBで。Ubuntu18.06の仮想環境立ててその中でローカルDynamoDB立ててみる。

aws-cliなど前準備
sudo apt install curl
sudo apt-get install openjdk-8-jre
sudo apt install python-pip python3-pip
sudo pip3 install awscli --upgrade --user
sudo apt install awscli
curl "https://d1vvhvl2y92vvt.cloudfront.net/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install

※aws2でないとPAY_PER_REQUEST が使えなかった為。ネット例だと作れてるのに・・・何故?

公式ページ 通りにローカルDynamoDBインストール

手順中のawsConfigureでのコマンドライン入力
AWS Access Key ID [None]: fakeMyKeyId
AWS Secret Access Key [None]: fakeSecretAccessKey
Default region name [None]: ap-northeast-1
Default output format [None]:  
確認
$ aws dynamodb list-tables --endpoint-url http://localhost:8000
{
    "TableNames": []
}

成功した様子。

テスト用テーブル作成

残金テーブル
$ aws2 dynamodb create-table --table-name account_balance \
 --attribute-definitions \
   AttributeName=tgt_date,AttributeType=S \
   AttributeName=method_cd,AttributeType=S \
 --key-schema \
   AttributeName=tgt_date,KeyType=HASH \
   AttributeName=method_cd,KeyType=RANGE \
 --billing-mode PAY_PER_REQUEST \
 --endpoint-url http://localhost:8000 --region ap-northeast-1

伝票テーブル
$ aws2 dynamodb create-table --table-name account_slip \
 --attribute-definitions \
   AttributeName=tgt_date,AttributeType=S \
   AttributeName=kind_cd_seq,AttributeType=S \
 --key-schema \
   AttributeName=tgt_date,KeyType=HASH \
   AttributeName=kind_cd_seq,KeyType=RANGE \
 --billing-mode PAY_PER_REQUEST \
 --endpoint-url http://localhost:8000 --region ap-northeast-1

ここでaws2を使っているのは、以下の現象があった為。

最初に出たエラー
aws: error: the following arguments are required: --provisioned-throughput
試しに--provisioned-throughputも指定したら出てきたエラー
Unknown options: --billing-mode, PAY_PER_REQUEST

テスト用データ作成

作成コマンド
aws dynamodb put-item --table-name account_balance \
 --endpoint-url http://localhost:8000 --region ap-northeast-1 \
 --item '{ "tgt_date": { "S": "20191228" }, "method_cd": { "S": "cash" }, "value": { "N": "20000" } }'
aws dynamodb put-item --table-name account_balance \
 --endpoint-url http://localhost:8000 --region ap-northeast-1 \
 --item '{ "tgt_date": { "S": "20191228" }, "method_cd": { "S": "suica" }, "value": { "N": "10000" } }'
aws dynamodb put-item --table-name account_balance \
 --endpoint-url http://localhost:8000 --region ap-northeast-1 \
 --item '{ "tgt_date": { "S": "20191228" }, "method_cd": { "S": "nanaco" }, "value": { "N": "5000" } }'


データ確認結果
aws dynamodb scan --table-name account_balance --endpoint-url http://localhost:8000 --region ap-northeast-1 
{
    "ConsumedCapacity": null,
    "ScannedCount": 3,
    "Count": 3,
    "Items": [
        {
            "method_cd": {
                "S": "cash"
            },
            "value": {
                "N": "20000"
            },
            "tgt_date": {
                "S": "20191228"
            }
        },
        {
            "method_cd": {
                "S": "nanaco"
            },
            "value": {
                "N": "5000"
            },
            "tgt_date": {
                "S": "20191228"
            }
        },
        {
            "method_cd": {
                "S": "suica"
            },
            "value": {
                "N": "10000"
            },
            "tgt_date": {
                "S": "20191228"
            }
        }
    ]
}


登録されてる。

UIモジュール導入

DynamoDB Localの導入 を参考に。


sudo apt update
sudo apt install nodejs npm
sudo npm install dynamodb-admin -g
export DYNAMO_ENDPOINT=http://localhost:8000
dynamodb-admin

テスト開発準備

pip3 install boto3

あと、ソース編集用にPyCharmインストール。

pythonで、boto3使ってローカルDynamoDBデータ確認
ローカルDynamoDB確認python
import json
import boto3
import logging
import datetime
import decimal
import dateutil.parser

from boto3.dynamodb.conditions import Key, Attr

class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            if o % 1 > 0:
                return float(o)
            else:
                return int(o)
        return super(DecimalEncoder, self).default(o)

def main():
    logging.basicConfig(level=logging.INFO)

    dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')
    SAFECNT = 50
    table_name = 'account_balance'
    tgtDate = '20191228'
    primary_key = {'tgt_date': tgtDate}
    dynamotable = dynamodb.Table(table_name)

    wkres = dynamotable.query(KeyConditionExpression=Key('tgt_date').eq(tgtDate))
    wkitems = wkres['Items']
    # org_len = len(wkitems)

    date_to = dateutil.parser.parse(tgtDate)
    wkchk = 0
    wkloop = date_to
    while len(wkitems) == 0 and wkchk <= SAFECNT :
        wkloop = wkloop + datetime.timedelta(days=-1)
        wkres = dynamotable.query(KeyConditionExpression=Key('tgt_date').eq(wkloop.strftime("%Y%m%d")))
        wkitems = wkres['Items']
        SAFECNT += 1

    logging.info('Result body: ' + json.dumps(wkitems, cls=DecimalEncoder, indent=0))

main()

現在のロジック

トランザクション的に一番激しい処理は、既存伝票データの日付や金額を変更 する処理(キーが変わる)。新規情報と更新前情報の両方が処理に渡される。

  1. 新規伝票データをDynamoDBに登録
  2. 既存伝票データをDynamoDBから削除
  3. 前日(無かったら遡り)の残金情報をDynamoDBから取得
  4. 伝票データ日付の全伝票データをDynamoDBから取得して計算した残金情報をDynamoDBへ更新(もしくは登録)

問題のある部分

  • 1,2ステップの処理で伝票データが変わるが、それを4ステップで再取得してる(強い読み取り整合性を使う必要がある)。3,4ステップは全体再計算の様な時に使うロジック。それを通常データ処理で使ってる(はい、サボりました)。
  • 2ステップ以降で何らかのエラーが発生したらデータ不整合が発生する。
  • 3と4ステップの間に別処理により残金情報が更新されたら不整合が発生する。
  • 別処理などによる既存伝票データの変更(削除含む)があった場合、データ不整合が発生する。

解消するために必要な事

  • その日の全伝票データをDynamoDBから取ってくるのでなく、処理に渡されてくる新旧伝票データのみで処理するように変更。トランザクション以前の問題。
  • 伝票データと残額データの更新処理を1トランザクションとして扱う。DynamoDBのトランザクション機能で解消できるはず。
  • 3ステップの残額データ取得時に残額データの読み取りロックが必要。「DynamoDBのトランザクションについてFAQ形式で答えてみる 」によると、DynamoDBのトランザクションでも、ロックはされないらしい。但し、例外エラーは返ってくるらしい。エラーになる様な時は最初からのやり直しが必要な状況。エラーでも問題ない。DynamoDBのトランザクション機能で解消できるはず。
  • 画面表示されたデータが、その読み込み時と、更新処理実行時に、別処理によって変わっていない事をチェックする。これも、ConditionCheck で対応できそう。

基本技術テスト

DynamoDBのトランザクションを試してみた #reinvent を参考に、自分のプログラムで使ってるテーブル構造に当てはめて色々実験。追加、更新、削除、それらの組み合わせ、データ取得といった所の実験。

pythonで、ローカルDynamoDBトランザクション色々試したソース
基本技術確認テストpython
import json
import boto3
import logging
import datetime
import decimal
import dateutil.parser

from boto3.dynamodb.conditions import Key, Attr

dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')
client = boto3.client('dynamodb', endpoint_url='http://localhost:8000')

class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            if o % 1 > 0:
                return float(o)
            else:
                return int(o)
        return super(DecimalEncoder, self).default(o)

def test():
    logging.basicConfig(level=logging.INFO)

    SAFECNT = 50
    table_name = 'account_balance'
    tgtDate = '20191228'
    primary_key = {'tgt_date': tgtDate}
    dynamotable = dynamodb.Table(table_name)

    wkres = dynamotable.query(KeyConditionExpression=Key('tgt_date').eq(tgtDate))
    wkitems = wkres['Items']
    # org_len = len(wkitems)

    date_to = dateutil.parser.parse(tgtDate)
    wkchk = 0
    wkloop = date_to
    while len(wkitems) == 0 and wkchk <= SAFECNT :
        wkloop = wkloop + datetime.timedelta(days=-1)
        wkres = dynamotable.query(KeyConditionExpression=Key('tgt_date').eq(wkloop.strftime("%Y%m%d")))
        wkitems = wkres['Items']
        SAFECNT += 1

    logging.info('Result body: ' + json.dumps(wkitems, cls=DecimalEncoder, indent=0))

def get_putslip_transaction_item():
    table_name = 'account_slip'
    retmap = {
        'Put': {
            'TableName': table_name,
            'Item': {
                "tgt_date": { "S": "20191228" },
                "kind_cd_seq": { "S": "food01_uuid0001" },
                "method_cd": { "S": "cash" },
                "value": { "N": "120" }
            },
            'ConditionExpression': 'attribute_not_exists(tgt_date) and attribute_not_exists(kind_cd_seq)'
        }
    }
    return retmap

def get_updateslip_transaction_item():

    table_name = 'account_slip'
    retmap = {
        'Update': {
            'TableName': table_name,
            'Key': {
                "tgt_date": { "S": "20191228" },
                "kind_cd_seq": { "S": "food02_uuid0001" }
            },
            'ConditionExpression': '#tgt_date = :tgt and #kind_cd_seq = :kindseq and #value = :befval',
            'UpdateExpression': 'SET #value = :aftval, #method_cd = :method_cd, #memo = :memo',
            'ExpressionAttributeNames' : {
                '#tgt_date' : 'tgt_date',
                '#kind_cd_seq' : 'kind_cd_seq',
                '#value' : 'value',
                '#method_cd': 'method_cd',
                '#memo': 'memo'
            },
            'ExpressionAttributeValues': {
                ':tgt': {'S': '20191228'},
                ':kindseq': {'S': 'food02_uuid0001'},
                ':aftval' : {'N': '150'},
                ':befval': {'N': '160'},
                ':method_cd': {'S': 'cash'},
                ':memo': {'S': 'memo'}
            }
        }
    }
    return retmap

def get_putdelslip_transaction_items():
    table_name = 'account_slip'
    items = []
    putitem = {
        'Put': {
            'TableName': table_name,
            'Item': {
                "tgt_date": { "S": "20191228" },
                "kind_cd_seq": { "S": "food02_uuid0001" },
                "method_cd": { "S": "cash" },
                "value": { "N": "160" }
            },
            'ConditionExpression': 'attribute_not_exists(tgt_date) and attribute_not_exists(kind_cd_seq)'
        }
    }
    delitem = {
        'Delete': {
            'TableName': table_name,
            'Key': {
                "tgt_date": { "S": "20191228" },
                "kind_cd_seq": { "S": "food01_uuid0001" }
            },
            'ConditionExpression': '#tgt_date = :tgt and #kind_cd_seq = :kindseq and #value = :befval',
            'ExpressionAttributeNames' : {
                '#tgt_date' : 'tgt_date',
                '#kind_cd_seq' : 'kind_cd_seq',
                '#value' : 'value'
            },
            'ExpressionAttributeValues': {
                ':tgt': {'S': '20191228'},
                ':kindseq': {'S': 'food01_uuid0001'},
                ':befval': {'N': '150'}
            }
        }
    }
    items.append(putitem)
    items.append(delitem)
    return items

def get_balance_transaction_items():
    table_name = 'account_balance'
    return [
        {
            'Get': {
                'TableName': table_name,
                'Key': {
                    "tgt_date": {"S": "20191228"},
                    "method_cd": {"S": "suica"}
                }
            }
        },
        {
            'Get': {
                'TableName': table_name,
                'Key': {
                    "tgt_date": {"S": "20191228"},
                    "method_cd": {"S": "cash"}
                }
            }
        }
    ]

def convert_dynamodata_to_map(dynamodatalist):
    retlist = []
    for dynamodata in dynamodatalist:
        onedata = {}
        wkitem = dynamodata['Item']
        for key in wkitem.keys():
            valobj = wkitem[key]
            for typkey in valobj.keys():
                if typkey == 'N':
                    onedata[key] = int(valobj[typkey])
                else:
                    onedata[key] = valobj[typkey]
        retlist.append(onedata)

    return retlist

def transact_slip_item():
    try:
        items=[]
        #items.append(get_putslip_transaction_item())
        items.append(get_updateslip_transaction_item())
        #items.extend(get_putdelslip_transaction_items())
        #items=get_putdelslip_transaction_items()
        response = client.transact_write_items(
            ReturnConsumedCapacity='INDEXES',
            TransactItems=items
        )
    except Exception as e:
        print('transact_write_items exception: {}'.format(e))
        raise

def transact_get_item():
    try:
        response = client.transact_get_items(
            TransactItems = get_balance_transaction_items()
        )
        print('transact_get_items success: {}'.format(convert_dynamodata_to_map(response['Responses'])))
    except Exception as e:
        print('transact_get_items exception: {}'.format(e))
        raise

#transact_get_item()
transact_slip_item()
#test()

詳細な説明は紹介したページにお任せするとして、状況に応じて以下のようなエラーが出た。

ConditionExpressionの条件にマッチしなかった場合(別処理で更新された事想定)
transact_write_items exception: An error occurred (TransactionCanceledException) when calling the TransactWriteItems operation: Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]
一つのトランザクションアイテムに複数処理入れてみた場合(単にどうなるか試しただけ)
transact_write_items exception: An error occurred (ValidationException) when calling the TransactWriteItems operation: TransactItems can only contain one of Check, Put, Update or Delete
複数のトランザクションアイテムの2つ目のConditionExpressionが条件にマッチしなかった場合(わざと一部の処理失敗時の動作確認)
transact_write_items exception: An error occurred (TransactionCanceledException) when calling the TransactWriteItems operation: Transaction cancelled, please refer cancellation reasons for specific reasons [None, ConditionalCheckFailed]

ちゃんと複数処理時に、失敗した時はRollbackしてくれてた。
本当は、複数スレッド作って同時処理コンフリクトも試してみたかった。が、トランザクションアイテムを複数作って、それを引数として実行する形。わざと1つ目の処理後にウェイト入れるとか出来なさそうなので諦めた。

ポイント

  • 登録時は、別の処理で同キーデータが登録されていない事をチェック
登録時TransactItem例
{
    'Put': {
        'TableName': table_name,
        'Item': {
            'tgt_date' { 'S': '20191228' },
            'kind_cd_seq': { 'S': 'food02_uuid0001' },
            'method_cd': { 'S': 'cash' },
            'value': { 'N': '160' }
        },
        'ここでチェック': '同一キーが無い事が条件',
        'ConditionExpression': 'attribute_not_exists(tgt_date) and attribute_not_exists(kind_cd_seq)'
    }
}
  • 更新時は、別の処理で値が変更されていない事をチェック
更新時TransactItem例
{
    'Update': {
        'TableName': table_name,
        'Key': {
            'tgt_date': { 'S': '20191228' },
            'kind_cd_seq': { 'S': 'food02_uuid0001' }
        },
        'ここでチェック1': '現在の値が想定する値(画面読み込み時の値など)である事が条件',
        'ここでチェック2': '本来は専用フィールド設けてuuidなど更新毎に変わる値をチェックする方が良い',
        'ConditionExpression': '#tgt_date = :tgt and #kind_cd_seq = :kindseq and #value = :befval',
        '更新する値指定': '実際に更新する値',
        'UpdateExpression': 'SET #value = :aftval, #method_cd = :method_cd, #memo = :memo',
        'フィールド名エイリアス指定': 'エイリアスと実際のフィールド名セット',
        'ExpressionAttributeNames' : {
            '#tgt_date' : 'tgt_date',
            '#kind_cd_seq' : 'kind_cd_seq',
            '#value' : 'value',
            '#method_cd': 'method_cd',
            '#memo': 'memo'
        },
        'パラメーター': 'バインドパラメーター値指定',
        'ExpressionAttributeValues': {
            ':tgt': {'S': '20191228'},
            ':kindseq': {'S': 'food02_uuid0001'},
            ':aftval' : {'N': '150'},
            ':befval': {'N': '160'},
            ':method_cd': {'S': 'cash'},
            ':memo': {'S': 'memo'}
        }
    }
}
  • 削除時は、別の処理で既に削除されていない事、別の値に更新されていない事をチェック
削除時TransactItem例
{
    'Delete': {
        'TableName': table_name,
        'Key': {
            "tgt_date": { "S": "20191228" },
            "kind_cd_seq": { "S": "food01_uuid0001" }
        },
        'ここでチェック1': '現在の値が想定する値(画面読み込み時の値など)である事が条件',
        'ここでチェック2': '既に削除されてる時でもちゃんとエラーになる',
        'ConditionExpression': '#tgt_date = :tgt and #kind_cd_seq = :kindseq and #value = :befval',
        'ExpressionAttributeNames' : {
            '#tgt_date' : 'tgt_date',
            '#kind_cd_seq' : 'kind_cd_seq',
            '#value' : 'value'
        },
        'ExpressionAttributeValues': {
            ':tgt': {'S': '20191228'},
            ':kindseq': {'S': 'food01_uuid0001'},
            ':befval': {'N': '150'}
        }
    }
}

※処理競合を防ぐためにちゃんと登録と更新を分ける。普通のputItemだと無かったら登録、あったら更新だが、トランザクション処理が必要なデータでは不整合データになる。

実際の処理に組み入れる時に余儀なくされた事

  • transaction_get_items で取得できるのはデータ型も含まれているデータ構造だった。色々扱うの面倒なので、convert_dynamodata_to_mapという関数作って強引に普通使いのデータ構造に修正した。
  • ロジックの変更
    • 1トランザクション処理中に一旦DBにデータ保持という手法は使えない。RDBでのアプリでもパフォーマンスの面から避けるべきな手法。
    • 処理に必要なデータは事前に取得しておき、そのデータでトランザクションを完結させる様にする。これもRDBでも本来しておくべき内容。
    • トランザクションチェックの為に、更新する値だけでなく別更新されたかチェックに必要な処理前のデータ値も渡す様にする。
  • データフロー設計の変更
    • 残額情報は伝票データ更新時に足りない分を作っていたが、それを今回のトランザクション化に含めるロジックが大掛かりになってしまう恐れがあった。その日にデータ表示する時に事前に登録する形にした。これも同上。これによって「現在のロジック」のステップ3,4の処理に関係するロジックがシンプルになった。

実際に作成したメイン部分のソースはこちら

思った事

  • 処理がRollbackされるから、DynamoDBを使った開発にありがちな、開発時にミスって中途半端に更新されたデータを戻さなくて済む!!!条件ミスったりするともちろん駄目だけど、面倒は減るはず。
  • データの更新でエラーになって、結局原因はその当時に登録したデータ型だった(数字で登録すべきを文字で登録していたデータを更新しようとしていた)。DynamoDBは1つのカラムでもレコードによって別のデータ型を登録してしまう可能性あり。注意!!
  • 複数処理の一括化データ更新されたかのチェック条件をトランザクションアイテムに組み込む 機能によって、トランザクションが必要なケースのほとんどは対応できると思われる。すごい。
  • そういえば最近 Amazon は Oracle脱却を実現したという事だった。このトランザクション機能があればそれも夢ではないと納得した。
  • ただ、トランザクションに登録できるのは25アイテムと、RDBの様に大量データのトランザクションは不可能。ただ、本来RDBでもDBに負荷かかる(UNDO領域大量使用)のであまりよろしくない設計。※25アイテムの根拠公式ページ。 他のページでは10個と書かれてる所もある。拡張されたのかな?
  • きっとAmazon は Oracle 使いつつもロジックなどの設計最適化を行い、その後に置き換えたと思われる。RDBからの単純置き換えは、よっぽど既存設計がしっかりしていないと難しいはず。
  • 今回は使わなかったが、チェックのみのトランザクションアイテムも使える様子。
  • 同じく、更新時に直接の値でなく、既存値を含めた四則演算も指定できる様子。
  • DynamoDBのトランザクションについてFAQ形式で答えてみる にも書かれてますが、根本的にトランザクションが必要な部分を極力少なくした方が良いと思う。コスト減にもなるはず。

参考にさせてもらったページ

Condition Expressions - 公式ページ
DynamoDB トランザクションの例 - 公式ページ
transact-write-items - 公式ページ

開発時のDynamoDB環境としてDynamoDB Localを用いた際に行なったノウハウを公開します。
DynamoDB Localの導入
aws cli で DynamoDB を使う
DynamoDBのトランザクションについてFAQ形式で答えてみる
DynamoDBのトランザクションを試してみた #reinvent
PHPでDynamoDBのトランザクションを試してみた
【Python入門】初心者に最適!Pythonで簡単なタイマーを作成しよう
DynamoDBでデータを更新する際に使うUpdateExpressionについて一通りまとめてみた

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
16