LoginSignup
1
0

More than 1 year has passed since last update.

boto3 cheat sheet

Last updated at Posted at 2022-03-25

Ref document

API ref
Code examples

Init Env

import os

import boto3

def set_aws_cfg(accesskey, secretkey, region="ap-northeast-1"):
    os.environ["AWS_ACCESS_KEY_ID"] = accesskey
    os.environ["AWS_SECRET_ACCESS_KEY"] = secretkey
    os.environ["AWS_DEFAULT_REGION"] = region

set_aws_cfg("XXXXXXXXXXXXXXXX", "XXXXXXXXXXXXXXXXXXXXXXXXX")

# show user name
print(boto3.client('sts').get_caller_identity())

Multithreading or multiprocessing

It's recommended to create a new Session object for each thread or process:

import boto3
import boto3.session
import threading

class MyTask(threading.Thread):
    def run(self):
        # Here we create a new session per thread
        session = boto3.session.Session()

        # Next, we create a resource client using our thread's session object
        s3 = session.resource('s3')

        # Put your thread-safe code here

DynamoDB

List all table
client = boto3.client('dynamodb')
for t in re['TableNames']:
  print(t)
  re = client.describe_table(TableName=t)
  print(re)
Read all items.
def readTable(table_name, fields="", TotalSegments=1, Segment=0):
    """Get all Table data and return by JSON format."""
    d0 = datetime.now()
    capacityUnits = 0
    dynamodb = boto3.resource("dynamodb")
    table = dynamodb.Table(table_name)
    if fields:
        response = table.scan(
            ReturnConsumedCapacity="TOTAL",
            ProjectionExpression=fields[0],
            ExpressionAttributeNames=fields[1],
            TotalSegments=TotalSegments,
            Segment=Segment,
        )
    else:
        response = table.scan(
            ReturnConsumedCapacity="TOTAL", TotalSegments=TotalSegments, Segment=Segment
        )
    data = response["Items"]
    capacityUnits += response.get("ConsumedCapacity", {}).get("CapacityUnits", 0)

    # レスポンスに LastEvaluatedKey が含まれなくなるまでループ処理を実行する
    while "LastEvaluatedKey" in response:
        if fields:
            response = table.scan(
                ExclusiveStartKey=response["LastEvaluatedKey"],
                ReturnConsumedCapacity="TOTAL",
                ProjectionExpression=fields[0],
                ExpressionAttributeNames=fields[1],
                TotalSegments=TotalSegments,
                Segment=Segment,
            )
        else:
            response = table.scan(
                ExclusiveStartKey=response["LastEvaluatedKey"],
                ReturnConsumedCapacity="TOTAL",
                TotalSegments=TotalSegments,
                Segment=Segment,
            )
        capacityUnits += response.get("ConsumedCapacity", {}).get("CapacityUnits", 0)
        data.extend(response["Items"])
    d1 = datetime.now()
    print(f"{table_name} ConsumedCapacity :{capacityUnits} on {d1-d0} seconds")

    return data
deletes a table ```py dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('Movies') table.delete() ```
clone table

Ref

def create_table(src_table, dst_table, client):

    # get source table and its schema
    try:
        table_schema = client.describe_table(TableName=src_table)["Table"]
    except client.exceptions.ResourceNotFoundException:
        sys.exit(1)
    except:
        sys.exit(1)

    # create keyword args for copy table
    keyword_args = {"TableName": dst_table}
    keyword_args['KeySchema'] = table_schema['KeySchema']
    keyword_args['AttributeDefinitions'] = table_schema['AttributeDefinitions']

    global_secondary_indexes = []
    local_secondary_indexes = []

    if table_schema.get("GlobalSecondaryIndexes"):
        for item in table_schema["GlobalSecondaryIndexes"]:
            index = {}
            for k, v in item.items():
                if k in ["IndexName", "KeySchema", "Projection", "ProvisionedThroughput"]:
                    if k == "ProvisionedThroughput":
                        # uncomment below to have same read/write capacity as original table
                        # for key in v.keys():
                        #     if key not in ["ReadCapacityUnits", "WriteCapacityUnits"]:
                        #         del v[key]

                        # comment below to have same read/write capacity as original table
                        index[k] = {"ReadCapacityUnits": 3, "WriteCapacityUnits": 1200}
                        continue
                    index[k] = v
            global_secondary_indexes.append(index)

    if table_schema.get("LocalSecondaryIndexes"):
        for item in table_schema["LocalSecondaryIndexes"]:
            index = {}
            for k, v in item.iteritems():
                if k in ["IndexName", "KeySchema", "Projection"]:
                    index[k] = v
            local_secondary_indexes.append(index)

    if global_secondary_indexes:
        keyword_args["GlobalSecondaryIndexes"] = global_secondary_indexes
    if local_secondary_indexes:
        keyword_args["LocalSecondaryIndexes"] = local_secondary_indexes

    # uncomment below to have same read/write capacity as original table
    # provisionedThroughput = table_schema['ProvisionedThroughput']
    # for key in provisionedThroughput.keys():
    #     if key not in ["ReadCapacityUnits", "WriteCapacityUnits"]:
    #         del provisionedThroughput[key]

    # keyword_args["ProvisionedThroughput"] = provisionedThroughput

    # comment below to have same read/write capacity as original table
    keyword_args["ProvisionedThroughput"] = {"ReadCapacityUnits": 3, "WriteCapacityUnits": 1200}

    if table_schema.get('StreamSpecification'):
        keyword_args['StreamSpecification'] = table_schema['StreamSpecification']

    # create copy table
    try:
        client.describe_table(TableName=dst_table)
        print("!!! Table {0} already exists. Exiting...".format(dst_table))
        sys.exit(0)
    except client.exceptions.ResourceNotFoundException:
        client.create_table(**keyword_args)

        print("*** Waiting for the new table {0} to become active".format(dst_table))
        sleep(5)

        while client.describe_table(TableName=dst_table)['Table']['TableStatus'] != 'ACTIVE':
            sys.stdout.write(spinner.next())
            sys.stdout.flush()
            sleep(0.1)
            sys.stdout.write('\b')
        print("*** New table {0} to is now active!".format(dst_table))

JS

window.download= function(data, filename, type) {
    var file = new Blob([data], {type: type});
    if (window.navigator.msSaveOrOpenBlob) // IE10+
        window.navigator.msSaveOrOpenBlob(file, filename);
    else { // Others
        var a = document.createElement("a"),
                url = URL.createObjectURL(file);
        a.href = url;
        a.download = filename;
        document.body.appendChild(a);
        a.click();
        setTimeout(function() {
            document.body.removeChild(a);
            window.URL.revokeObjectURL(url);  
        }, 0); 
    }
}

download(JSON.stringify( data,null,2),"slack.json","text")
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0