8
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【メモ】AWS boto3 & dynamoDBチートシート

Posted at

1. はじめに

Python&Boto3経由でAWSのDynamoDBへアクセスする時によく使うメソッドたち(CRUD + COPY)をまとめてみました。

Github

2. Boto3メソッド

API

main.py
dynamodb = boto3.resource(
    "dynamodb",
    endpoint_url="http://localhost:8100",
    region_name=os.environ.get("REGION_NAME"),
    aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),
    aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
)

データ定義

今回のテーブルの定義とアイテムの定義はenv.yamlitem.yamlにて記載します。

env.yaml
# テーブルの定義
DynamoDB:
  TableName: TestTable
  AttributeDefinitions:
    - AttributeName: user_id
      AttributeType: S
    - AttributeName: count
      AttributeType: N
    - AttributeName: status
      AttributeType: N
  KeySchema:
    - AttributeName: user_id
      KeyType: HASH
    - AttributeName: count
      KeyType: RANGE
  GlobalSecondaryIndexes:
    - IndexName: user_count
      KeySchema:
        - AttributeName: user_id
          KeyType: HASH
        - AttributeName: status
          KeyType: RANGE
      Projection:
        ProjectionType: ALL
  BillingMode: PAY_PER_REQUEST
item.yaml
アイテムの定義
TestItem:
  - user_id: test_user_1
    count: 100
    status: 1

  - user_id: test_user_2
    count: 80
    status: 0

  - user_id: test_user_3
    count: 90
    status: 1

  - user_id: test_user_3
    count: 40
    status: 0

読み込み

main.py
with open("env.yaml", "r", encoding="utf-8") as f:
    params = yaml.safe_load(f)["DynamoDB"]
with open("item.yaml", "r", encoding="utf-8") as f:
    items = yaml.safe_load(f)["TestItem"]

C (作成)

テーブル作成

main.py
def create_table(dynamodb_resource: boto3.resource, params: dict):
    try:
        table = dynamodb_resource.create_table(**params)
        table.wait_until_exists()
        print(f"Table ({params['TableName']}) created.")
    except Exception as e:
        raise e

create_table(dynamodb, params)

アイテム作成

main.py
def put_item(dynamodb_resource: boto3.resource, table_name: str, items: dict):
    try:
        dynamodb_table = dynamodb_resource.Table(table_name)
    except Exception as e:
        raise e
    with dynamodb_table.batch_writer() as batch:
        for counter, item in enumerate(items):
            batch.put_item(Item=item)
    print(f"Items put: {counter+1}")

put_item(dynamodb, "TestTable", items)

R (読み込む)

テーブル一覧取得

main.py
def list_tables(dynamodb_resource: boto3.resource):
    tables = dynamodb_resource.tables.all()
    for table in tables:
        print(table)

list_tables(dynamodb)

テーブル定義取得

main.py
def get_table_schema(dynamodb_resource: boto3.resource, table_name: str):
    try:
        dynamodb_table = dynamodb_resource.Table(table_name)
    except Exception as e:
        raise e
    attrs = dynamodb_table.attribute_definitions
    schema = dynamodb_table.key_schema

get_table_schema(dynamodb, "TestTable")

スキャン(Scan)

main.py
def scan_table(dynamodb_resource: boto3.resource, table_name: str):
    try:
        table = dynamodb_resource.Table(table_name)
    except Exception as e:
        raise e

    response = table.scan()

    data = response["Items"]
    while "LastEvaluatedKey" in response:
        response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
        data.extend(response["Items"])
    return data

scan_table(dynamodb, "TestTable")

クエリ(Query)

main.py
def query_table(
    dynamodb_resource: boto3.resource,
    table_name: str,
    options: dict,
):
    table = dynamodb_resource.Table(table_name)
    query_res = table.query(**options)
    data = query_res["Items"]
    print(data)

query_options = {
    # "Select": "COUNT",
    "KeyConditionExpression": Key("user_id").eq("test_user_3")
    & Key("count").eq(40),
    # "FilterExpression": Attr("status").eq(1),
}

query_table(
    dynamodb_resource=dynamodb, 
    table_name="TestTable", 
    options=query_options
)

U (更新)

main.py
def update_item(dynamodb_resource: boto3.resource, table_name: str, option: dict):
    try:
        dynamodb_table = dynamodb_resource.Table(table_name)
    except Exception as e:
        raise e
    dynamodb_table.update_item(**option)

update_option = {
    "Key": {"user_id": "test_user_2", "count": 100},
    "UpdateExpression": "set #attr1 = :_status",
    "ExpressionAttributeNames": {"#attr1": "status"},
    "ExpressionAttributeValues": {":_status": 404},
    "ReturnValues": "UPDATED_NEW",
}

update_item(
    dynamodb_resource=dynamodb, 
    table_name="TestTable", 
    option=update_option
)

D (削除)

テーブル削除

main.py
def delete_table(dynamodb_resource: boto3.resource, table_name: str):
    try:
        table = dynamodb_resource.Table(table_name)
        table.delete()
        table.wait_until_not_exists()
        print(f"Table ({table_name}) deleted.")

    except Exception as e:
        raise e

delete_table(dynamodb, "TestTable")

全アイテム削除

main.py
def truncate_table(dynamodb_resource: boto3.resource, table_name: str):
    try:
        table = dynamodb_resource.Table(table_name)
    except Exception as e:
        raise e

    table_key_name = [key.get("AttributeName") for key in table.key_schema]

    projectionExpression = ", ".join("#" + key for key in table_key_name)
    expressionAttrNames = {"#" + key: key for key in table_key_name}

    counter = 0
    page = table.scan(
        ProjectionExpression=projectionExpression,
        ExpressionAttributeNames=expressionAttrNames,
    )
    with table.batch_writer() as batch:
        while page["Count"] > 0:
            counter += page["Count"]
            for itemKeys in page["Items"]:
                batch.delete_item(Key=itemKeys)
            if "LastEvaluatedKey" in page:
                page = table.scan(
                    ProjectionExpression=projectionExpression,
                    ExpressionAttributeNames=expressionAttrNames,
                    ExclusiveStartKey=page["LastEvaluatedKey"],
                )
            else:
                break
    print(f"Items deleted: {counter}")

truncate_table(dynamodb, "TestTable")

COPY

あるテーブルのアイテムを別のテーブルへコピーする。
:boom:注意:boom:
こちらはboto3.resourceではなく、boto3.clientを利用する。

main.py


def copy_table(
    src_dynamodb: boto3.client,
    src_table_name: str,
    dst_dynamodb: boto3.client,
    dst_table_name: str,
):
    dynamo_paginator = src_dynamodb.get_paginator("scan")

    dynamodb_response = dynamo_paginator.paginate(
        TableName=src_table_name,
        Select="ALL_ATTRIBUTES",
        ReturnConsumedCapacity="NONE",
        ConsistentRead=True,
    )

    for page in dynamodb_response:
        if page["Items"]:
            for count, item in enumerate(page["Items"]):
                dst_dynamodb.put_item(TableName=dst_table_name, Item=item)
            print(f"Items transfered: {count+1}")
        else:
            print("Original Table is empty.")
main.py
src_dynamodb_client = boto3.client(
        "dynamodb",
        endpoint_url="http://localhost:8100",
        region_name=os.environ.get("REGION_NAME"),
        aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),
        aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
    )

dst_dynamodb_client = boto3.client(
        "dynamodb",
        endpoint_url="http://localhost:8100",
        region_name=os.environ.get("REGION_NAME"),
        aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),
        aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
    )

copy_table(
    src_dynamodb_client, "TestTable", 
    dst_dynamodb_client, "TestTable2"
)

3. おわりに

自分がDynamoDBに対してよく使うメソッドをまとめてみました。
他のサービスに対してもまとめたいと思います。

8
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?