TL;DR
- パイプで
jq -r '(.TableStatistics[0]|to_entries|map(.key)),(.TableStatistics[]|[.[]])|@csv'
とすればOK
aws dms describe-table-statistics --replication-task-arn arn:aws:dms:hogehoge --output json | jq -r '(.TableStatistics[0]|to_entries|map(.key)),(.TableStatistics[]|[.[]])|@csv'
"SchemaName","TableName","Inserts","Deletes","Updates","Ddls","FullLoadRows","FullLoadCondtnlChkFailedRows","FullLoadErrorRows","FullLoadStartTime","FullLoadEndTime","FullLoadReloaded","LastUpdateTime","TableState","ValidationPendingRecords","ValidationFailedRecords","ValidationSuspendedRecords","ValidationState"
"HOGE","FUGA_FUGA",0,0,0,0,163099998,0,0,"2022-09-15T18:59:08.573000+09:00","2022-09-15T19:17:27.794000+09:00",false,"2022-09-15T19:17:42.106000+09:00","Table completed",0,0,0,"Not enabled"
- EventBridge / Lambdaを組み合わせることで、DMS完了後自動でS3にCSVを出力する仕組みを作りました
なぜ必要?
コンソール上からだとCSVダウンロードがすぐできるが、AWSCLIではできない。
DMS実行→完了後削除までを自動化しており、削除前に結果をCSVとして保存するフローも追加したかった。
参考
というかほぼこれです。ありがとうございます🙏
EventBridge / Lambdaで自動化
せっかくなのでDMSが終わったら自動でCSVがS3に上がるようにしてみましょう
Lambda
import os
import boto3
import json
from datetime import datetime
import csv
import pprint
from botocore.exceptions import ClientError
from botocore.config import Config
config = Config(
retries={
'max_attempts': 10,
'mode': 'standard'
}
)
s3_resource = boto3.resource('s3', config=config)
dms = boto3.client('dms', config=config)
FIELDNAMES = [
"SchemaName",
"TableName",
"Inserts",
"Deletes",
"Updates",
"Ddls",
"AppliedInserts",
"AppliedDeletes",
"AppliedUpdates",
"AppliedDdls",
"FullLoadRows",
"FullLoadCondtnlChkFailedRows",
"FullLoadErrorRows",
"FullLoadStartTime",
"FullLoadEndTime",
"FullLoadReloaded",
"LastUpdateTime",
"TableState",
"ValidationPendingRecords",
"ValidationFailedRecords",
"ValidationSuspendedRecords",
"ValidationState",
"ValidationStateDetails",
]
def lambda_handler(event, context):
pprint.pprint("Received event: " + json.dumps(event, indent=2))
upload_s3_bucket = os.environ['UPLOAD_S3_BUCKET']
temp = os.environ.get('TEMP','/tmp/')
task_arn = event['resources'][0]
tags = dms.list_tags_for_resource(
ResourceArn=task_arn
)['TagList']
#pprint.pprint(tags)
tag_name = [tag['Value'] for tag in tags if tag['Key'] == "Name"][0]
tenant_account = tag_name.split('-')[0]
landscape = tag_name.split('-')[1]
task_type = tag_name.split('-')[3]
print(tenant_account, landscape, task_type)
csv_path = f"{temp}{task_type}.csv"
table_statistics = dms.describe_table_statistics(
ReplicationTaskArn=task_arn)
#pprint.pprint(table_statistics)
with open(csv_path, 'w', newline="") as csv_file:
writer = csv.DictWriter(
csv_file, fieldnames=FIELDNAMES, quoting=csv.QUOTE_ALL)
writer.writeheader()
writer.writerows(table_statistics['TableStatistics'])
marker = table_statistics.get('Marker')
while marker:
table_statistics = dms.describe_table_statistics(
ReplicationTaskArn=task_arn, Marker=marker)
writer.writerows(table_statistics['TableStatistics'])
marker = table_statistics.get('Marker')
upload_s3_path = f"{tenant_account}/{landscape}/DMS/{datetime.today().strftime('%Y%m%d-%H%M%S')}/{task_type}.csv"
#print(upload_s3_path)
s3_bucket = s3_resource.Bucket(upload_s3_bucket)
s3_bucket.upload_file(
Filename=csv_path,
Key=upload_s3_path
)
print(f"UPLOAD SUCCESS: {upload_s3_path}")
if __name__ == "__main__":
import sys
os.environ['UPLOAD_S3_BUCKET'] = sys.argv[1]
os.environ['TEMP'] = sys.argv[2]
os.chdir(os.path.dirname(os.path.abspath(__file__)))
with open('event.json') as f:
event = json.load(f)
context = {}
lambda_handler(event, context)
EventBridge
公式ドキュメントをみるとDMS-EVENT-0078
とDMS-EVENT-0079
をEventBridgeでキャッチすれば、正常終了時とエラー時の両方でLambda実行ができそうです
AWSTemplateFormatVersion: '2010-09-09'
Description: CloudFormation template for EventBridge rule 'output-dms-result-csv-test'
Resources:
EventRule0:
Type: AWS::Events::Rule
Properties:
EventBusName: default
EventPattern:
source:
- aws.dms
detail-type:
- DMS Replication Task State Change
detail:
type:
- REPLICATION_TASK
eventId:
- DMS-EVENT-0078
- DMS-EVENT-0079
Name: output-dms-result-csv-test
State: ENABLED
Targets:
- Id: Ideae1c249-516d-4c4a-8bb9-6c40d87aa383
Arn: >-
arn:aws:lambda:ap-northeast-1:******:function:output-dms-result-csv-test