7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【AWS】【DMS】AWS CLIで DMS のテーブル統計をCSVで出力する方法 とLambdaで自動出力

Last updated at Posted at 2022-09-15

TL;DR

  • パイプでjq -r '(.TableStatistics[0]|to_entries|map(.key)),(.TableStatistics[]|[.[]])|@csv'とすればOK
aws dms describe-table-statistics --replication-task-arn arn:aws:dms:hogehoge --output json | jq -r '(.TableStatistics[0]|to_entries|map(.key)),(.TableStatistics[]|[.[]])|@csv'
"SchemaName","TableName","Inserts","Deletes","Updates","Ddls","FullLoadRows","FullLoadCondtnlChkFailedRows","FullLoadErrorRows","FullLoadStartTime","FullLoadEndTime","FullLoadReloaded","LastUpdateTime","TableState","ValidationPendingRecords","ValidationFailedRecords","ValidationSuspendedRecords","ValidationState"
"HOGE","FUGA_FUGA",0,0,0,0,163099998,0,0,"2022-09-15T18:59:08.573000+09:00","2022-09-15T19:17:27.794000+09:00",false,"2022-09-15T19:17:42.106000+09:00","Table completed",0,0,0,"Not enabled"
  • EventBridge / Lambdaを組み合わせることで、DMS完了後自動でS3にCSVを出力する仕組みを作りました

なぜ必要?

コンソール上からだとCSVダウンロードがすぐできるが、AWSCLIではできない。

image.png

DMS実行→完了後削除までを自動化しており、削除前に結果をCSVとして保存するフローも追加したかった。

参考

というかほぼこれです。ありがとうございます🙏

EventBridge / Lambdaで自動化

せっかくなのでDMSが終わったら自動でCSVがS3に上がるようにしてみましょう

Lambda

import os
import boto3
import json
from datetime import datetime
import csv
import pprint
from botocore.exceptions import ClientError
from botocore.config import Config

config = Config(
    retries={
        'max_attempts': 10,
        'mode': 'standard'
    }
)

s3_resource = boto3.resource('s3', config=config)
dms = boto3.client('dms', config=config)

FIELDNAMES = [
    "SchemaName",
    "TableName",
    "Inserts",
    "Deletes",
    "Updates",
    "Ddls",
    "AppliedInserts",
    "AppliedDeletes",
    "AppliedUpdates",
    "AppliedDdls",
    "FullLoadRows",
    "FullLoadCondtnlChkFailedRows",
    "FullLoadErrorRows",
    "FullLoadStartTime",
    "FullLoadEndTime",
    "FullLoadReloaded",
    "LastUpdateTime",
    "TableState",
    "ValidationPendingRecords",
    "ValidationFailedRecords",
    "ValidationSuspendedRecords",
    "ValidationState",
    "ValidationStateDetails",
]

def lambda_handler(event, context):
    pprint.pprint("Received event: " + json.dumps(event, indent=2))

    upload_s3_bucket = os.environ['UPLOAD_S3_BUCKET']
    temp = os.environ.get('TEMP','/tmp/')

    task_arn = event['resources'][0]
    tags = dms.list_tags_for_resource(
        ResourceArn=task_arn
    )['TagList']
    #pprint.pprint(tags)
    tag_name = [tag['Value'] for tag in tags if tag['Key'] == "Name"][0]

    tenant_account = tag_name.split('-')[0]
    landscape = tag_name.split('-')[1]
    task_type = tag_name.split('-')[3]
    print(tenant_account, landscape, task_type)

    csv_path = f"{temp}{task_type}.csv"
    table_statistics = dms.describe_table_statistics(
        ReplicationTaskArn=task_arn)
    #pprint.pprint(table_statistics)
    with open(csv_path, 'w', newline="") as csv_file:
        writer = csv.DictWriter(
            csv_file, fieldnames=FIELDNAMES, quoting=csv.QUOTE_ALL)

        writer.writeheader()
        writer.writerows(table_statistics['TableStatistics'])
        marker = table_statistics.get('Marker')
        while marker:
            table_statistics = dms.describe_table_statistics(
                ReplicationTaskArn=task_arn, Marker=marker)
            writer.writerows(table_statistics['TableStatistics'])
            marker = table_statistics.get('Marker')

    upload_s3_path = f"{tenant_account}/{landscape}/DMS/{datetime.today().strftime('%Y%m%d-%H%M%S')}/{task_type}.csv"
    #print(upload_s3_path)
    s3_bucket = s3_resource.Bucket(upload_s3_bucket)
    s3_bucket.upload_file(
        Filename=csv_path,
        Key=upload_s3_path
    )

    print(f"UPLOAD SUCCESS: {upload_s3_path}")


if __name__ == "__main__":
    import sys
    os.environ['UPLOAD_S3_BUCKET'] = sys.argv[1]
    os.environ['TEMP'] = sys.argv[2]
    os.chdir(os.path.dirname(os.path.abspath(__file__)))
    with open('event.json') as f:
        event = json.load(f)
    context = {}
    lambda_handler(event, context)

EventBridge

公式ドキュメントをみるとDMS-EVENT-0078DMS-EVENT-0079をEventBridgeでキャッチすれば、正常終了時とエラー時の両方でLambda実行ができそうです

AWSTemplateFormatVersion: '2010-09-09'
Description: CloudFormation template for EventBridge rule 'output-dms-result-csv-test'
Resources:
  EventRule0:
    Type: AWS::Events::Rule
    Properties:
      EventBusName: default
      EventPattern:
        source:
          - aws.dms
        detail-type:
          - DMS Replication Task State Change
        detail:
          type:
            - REPLICATION_TASK
          eventId:
            - DMS-EVENT-0078
            - DMS-EVENT-0079
      Name: output-dms-result-csv-test
      State: ENABLED
      Targets:
        - Id: Ideae1c249-516d-4c4a-8bb9-6c40d87aa383
          Arn: >-
            arn:aws:lambda:ap-northeast-1:******:function:output-dms-result-csv-test

7
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?