1. はじめに
Python&Boto3経由でAWSのDynamoDBへアクセスする時によく使うメソッドたち(CRUD + COPY)をまとめてみました。
2. Boto3メソッド
API
main.py
dynamodb = boto3.resource(
"dynamodb",
endpoint_url="http://localhost:8100",
region_name=os.environ.get("REGION_NAME"),
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
)
データ定義
今回のテーブルの定義とアイテムの定義はenv.yaml
とitem.yaml
にて記載します。
env.yaml
# テーブルの定義
DynamoDB:
TableName: TestTable
AttributeDefinitions:
- AttributeName: user_id
AttributeType: S
- AttributeName: count
AttributeType: N
- AttributeName: status
AttributeType: N
KeySchema:
- AttributeName: user_id
KeyType: HASH
- AttributeName: count
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: user_count
KeySchema:
- AttributeName: user_id
KeyType: HASH
- AttributeName: status
KeyType: RANGE
Projection:
ProjectionType: ALL
BillingMode: PAY_PER_REQUEST
item.yaml
アイテムの定義
TestItem:
- user_id: test_user_1
count: 100
status: 1
- user_id: test_user_2
count: 80
status: 0
- user_id: test_user_3
count: 90
status: 1
- user_id: test_user_3
count: 40
status: 0
読み込み
main.py
with open("env.yaml", "r", encoding="utf-8") as f:
params = yaml.safe_load(f)["DynamoDB"]
with open("item.yaml", "r", encoding="utf-8") as f:
items = yaml.safe_load(f)["TestItem"]
C (作成)
テーブル作成
main.py
def create_table(dynamodb_resource: boto3.resource, params: dict):
try:
table = dynamodb_resource.create_table(**params)
table.wait_until_exists()
print(f"Table ({params['TableName']}) created.")
except Exception as e:
raise e
create_table(dynamodb, params)
アイテム作成
main.py
def put_item(dynamodb_resource: boto3.resource, table_name: str, items: dict):
try:
dynamodb_table = dynamodb_resource.Table(table_name)
except Exception as e:
raise e
with dynamodb_table.batch_writer() as batch:
for counter, item in enumerate(items):
batch.put_item(Item=item)
print(f"Items put: {counter+1}")
put_item(dynamodb, "TestTable", items)
R (読み込む)
テーブル一覧取得
main.py
def list_tables(dynamodb_resource: boto3.resource):
tables = dynamodb_resource.tables.all()
for table in tables:
print(table)
list_tables(dynamodb)
テーブル定義取得
main.py
def get_table_schema(dynamodb_resource: boto3.resource, table_name: str):
try:
dynamodb_table = dynamodb_resource.Table(table_name)
except Exception as e:
raise e
attrs = dynamodb_table.attribute_definitions
schema = dynamodb_table.key_schema
get_table_schema(dynamodb, "TestTable")
スキャン(Scan)
main.py
def scan_table(dynamodb_resource: boto3.resource, table_name: str):
try:
table = dynamodb_resource.Table(table_name)
except Exception as e:
raise e
response = table.scan()
data = response["Items"]
while "LastEvaluatedKey" in response:
response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
data.extend(response["Items"])
return data
scan_table(dynamodb, "TestTable")
クエリ(Query)
main.py
def query_table(
dynamodb_resource: boto3.resource,
table_name: str,
options: dict,
):
table = dynamodb_resource.Table(table_name)
query_res = table.query(**options)
data = query_res["Items"]
print(data)
query_options = {
# "Select": "COUNT",
"KeyConditionExpression": Key("user_id").eq("test_user_3")
& Key("count").eq(40),
# "FilterExpression": Attr("status").eq(1),
}
query_table(
dynamodb_resource=dynamodb,
table_name="TestTable",
options=query_options
)
U (更新)
main.py
def update_item(dynamodb_resource: boto3.resource, table_name: str, option: dict):
try:
dynamodb_table = dynamodb_resource.Table(table_name)
except Exception as e:
raise e
dynamodb_table.update_item(**option)
update_option = {
"Key": {"user_id": "test_user_2", "count": 100},
"UpdateExpression": "set #attr1 = :_status",
"ExpressionAttributeNames": {"#attr1": "status"},
"ExpressionAttributeValues": {":_status": 404},
"ReturnValues": "UPDATED_NEW",
}
update_item(
dynamodb_resource=dynamodb,
table_name="TestTable",
option=update_option
)
D (削除)
テーブル削除
main.py
def delete_table(dynamodb_resource: boto3.resource, table_name: str):
try:
table = dynamodb_resource.Table(table_name)
table.delete()
table.wait_until_not_exists()
print(f"Table ({table_name}) deleted.")
except Exception as e:
raise e
delete_table(dynamodb, "TestTable")
全アイテム削除
main.py
def truncate_table(dynamodb_resource: boto3.resource, table_name: str):
try:
table = dynamodb_resource.Table(table_name)
except Exception as e:
raise e
table_key_name = [key.get("AttributeName") for key in table.key_schema]
projectionExpression = ", ".join("#" + key for key in table_key_name)
expressionAttrNames = {"#" + key: key for key in table_key_name}
counter = 0
page = table.scan(
ProjectionExpression=projectionExpression,
ExpressionAttributeNames=expressionAttrNames,
)
with table.batch_writer() as batch:
while page["Count"] > 0:
counter += page["Count"]
for itemKeys in page["Items"]:
batch.delete_item(Key=itemKeys)
if "LastEvaluatedKey" in page:
page = table.scan(
ProjectionExpression=projectionExpression,
ExpressionAttributeNames=expressionAttrNames,
ExclusiveStartKey=page["LastEvaluatedKey"],
)
else:
break
print(f"Items deleted: {counter}")
truncate_table(dynamodb, "TestTable")
COPY
あるテーブルのアイテムを別のテーブルへコピーする。
注意
こちらはboto3.resource
ではなく、boto3.client
を利用する。
main.py
def copy_table(
src_dynamodb: boto3.client,
src_table_name: str,
dst_dynamodb: boto3.client,
dst_table_name: str,
):
dynamo_paginator = src_dynamodb.get_paginator("scan")
dynamodb_response = dynamo_paginator.paginate(
TableName=src_table_name,
Select="ALL_ATTRIBUTES",
ReturnConsumedCapacity="NONE",
ConsistentRead=True,
)
for page in dynamodb_response:
if page["Items"]:
for count, item in enumerate(page["Items"]):
dst_dynamodb.put_item(TableName=dst_table_name, Item=item)
print(f"Items transfered: {count+1}")
else:
print("Original Table is empty.")
main.py
src_dynamodb_client = boto3.client(
"dynamodb",
endpoint_url="http://localhost:8100",
region_name=os.environ.get("REGION_NAME"),
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
)
dst_dynamodb_client = boto3.client(
"dynamodb",
endpoint_url="http://localhost:8100",
region_name=os.environ.get("REGION_NAME"),
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
)
copy_table(
src_dynamodb_client, "TestTable",
dst_dynamodb_client, "TestTable2"
)
3. おわりに
自分がDynamoDB
に対してよく使うメソッドをまとめてみました。
他のサービスに対してもまとめたいと思います。