はじめに
Amazon DynamoDBの増分エクスポートの機能が9/26に発表された。
これまではある時点の全件スナップショットしか取得できなかったが、「この期間に更新が入ったユーザ」を探せるようになったことで、Amazon DynamoDBでマスタ管理している際の差分履歴情報を別のテーブルで管理することが不要になった。
今回は、この増分エクスポートの仕様を確認しつつ、継続的に増分エクスポートを取得する方法を検討する。
なお、増分エクスポートはマネージメントコンソールからもCLIからも実行できるが、いずれもあくまでの1回限りの「指定した期間で差分抽出する」コマンドを実行するのみであり、継続的に取得する場合はAWS公式のブログでも言われている通り、EventBridgeとの連携が必要になる。
増分エクスポートの基本的な仕様
増分エクスポートの仕様は公式のAPIリファレンスを見るのが一番手っ取り早いだろう。
{
"ClientToken": "string",
"ExportFormat": "string",
"ExportTime": number,
"ExportType": "string",
"IncrementalExportSpecification": {
"ExportFromTime": number,
"ExportToTime": number,
"ExportViewType": "string"
},
"S3Bucket": "string",
"S3BucketOwner": "string",
"S3Prefix": "string",
"S3SseAlgorithm": "string",
"S3SseKmsKeyId": "string",
"TableArn": "string"
}
それぞれの項目の意味を、増分エクスポートに特化して書いておく。
プロパティ名 | 必須 | 意味 |
---|---|---|
S3Bucket | ○ | 出力先のS3バケット名 |
TableArn | ○ | 抽出元のDynamoDBのARN |
ClientToken | レスポンス情報で同名の項目が返される。これを使うと、APIの呼び出しが冪等になるらしい。具体的なユースケースは未考察 | |
ExportFormat |
DYNAMODB_JSON かION を指定jqで結果を確認するなら DYNAMODB_JSON にしておくのが無難 |
|
ExportTime | 増分エクスポート時は指定しない | |
ExportType |
INCREMENTAL_EXPORT を指定 |
|
IncrementalExportSpecification | ○ | 増分エクスポートの場合必須 |
ExportFromTime | ○ | 増分エクスポートの開始日時 この日時を含む Unixタイムスタンプ型 テーブル作成日時よりも前を指定するとエラー |
ExportToTime | ○ | 増分エクスポートの終了日時 この日時を含まない Unixタイムスタンプ型 現在日時よりも後を指定するとエラー |
ExportViewType | ○ |
NEW_IMAGE かNEW_AND_OLD_IMAGES を指定更新前の情報も欲しい場合は NEW_AND_OLD_IMAGES を指定 |
S3BucketOwner | バケットを保持しているAWSアカウントのID 具体的なユースケースは未考察 自分のアカウントのS3の場合は指定しなくても動作する |
|
S3Prefix | 指定した場合にオブジェクトにプレフィックスを付与できる。"/"も指定できるので、日付で階層にする等の分類も可能 これで指定したプレフィックスのディレクトリ配下に、公式ブログの「File layout」に記載の構造でエクスポートが行われる |
|
S3SseAlgorithm |
AES256 かKMS を指定エクスポートのS3バケットに対して暗号化して出力する場合は設定する |
|
S3SseKmsKeyId |
S3SseAlgorithm がKMS の場合にキーIDを指定するキーへのエイリアスでもOK |
新規にPutItemする場合
新規にPutItemする場合、以下のようなJSONが出力される(ExportFormat: "DYNAMODB_JSON"
の場合)。
実際にPutした情報は、id, request_id, write_dateだ。NewImage
だけが出力される動作となる。
{
"Metadata": {
"WriteTimestampMicros": {
"N": "1696050523828262"
}
},
"Keys": {
"id": {
"S": "00001284"
}
},
"NewImage": {
"id": {
"S": "00001284"
},
"request_id": {
"S": "24601436300e7967679fe0c785586a1d"
},
"write_date": {
"S": "2023-09-30 14:08:44.764063"
}
}
}
PutItemで上書き更新する場合
PutItemで上書き更新する場合は、以下のようになる。
OldImage
で更新前情報が取得可能だ。
{
"Metadata": {
"WriteTimestampMicros": {
"N": "1696050058135755"
}
},
"Keys": {
"id": {
"S": "00000831"
}
},
"OldImage": {
"id": {
"S": "00000831"
},
"request_id": {
"S": "1856157bd7f5b4aa8dc3ccb9303b9615"
},
"write_date": {
"S": "2023-09-30 04:58:07.067409"
}
},
"NewImage": {
"id": {
"S": "00000831"
},
"request_id": {
"S": "24601436300e7967679fe0c785586a1d"
},
"write_date": {
"S": "2023-09-30 14:00:59.070395"
}
}
}
UpdateItemで上書き更新する場合
UpdateItemでレコードに対してtest_data2の項目を追加してみる。
この場合も、ちゃんとレコードの全体のスナップショットがOldImage
にもNewImage
にも出力される。
「この更新をしたときにこのレコードは全体としてどんな形だったか」はレコード全体のスナップショットとして出力されることが確認できる。
"Metadata": {
"WriteTimestampMicros": {
"N": "1696080209015554"
}
},
"Keys": {
"id": {
"S": "00000002"
}
},
"OldImage": {
"id": {
"S": "00000002"
},
"request_id": {
"S": "97dd14fabd3538a076d955f4097033af"
},
"test_data": {
"S": "hogehoge"
},
"write_date": {
"S": "2023-09-30 21:58:36.635478"
}
},
"NewImage": {
"id": {
"S": "00000002"
},
"request_id": {
"S": "97dd14fabd3538a076d955f4097033af"
},
"test_data": {
"S": "hogehoge"
},
"test_data2": {
"S": "higehige"
},
"write_date": {
"S": "2023-09-30 21:58:36.635478"
}
}
}
増分エクスポートをEventBridgeから定期実行する
EventBridgeはSchedulerを使用しよう。
今回は、毎時30分に、前の時間の0分0秒から、59分59秒までを取得するように構築する。
スクリプト
TypeScriptで以下のように作成する。
今回は日時で取得するので、S3のプレフィックスは「年月/日付_時間」にする。
ディレクトリは日本時間の方が分かりやすいので、dayjs
でタイムゾーンを設定しながら作っていこう。
import { EventBridgeEvent } from 'aws-lambda';
import { DynamoDBClient, ExportTableToPointInTimeCommand } from '@aws-sdk/client-dynamodb';
// Settings for Localtimezone
import dayjs from 'dayjs';
import timezone from 'dayjs/plugin/timezone';
import utc from 'dayjs/plugin/utc';
// Configuration for DayJS
dayjs.extend(timezone);
dayjs.extend(utc);
const client = new DynamoDBClient({});
export const handler = async (event: EventBridgeEvent<'Scheduled Event', any>): Promise<void> => {
console.log(`Event: ${JSON.stringify(event, null, 2)}`);
const toTime = dayjs(event.time.substring(0, 13)).unix();
const fromTime = toTime - 3600;
const response = await client.send(new ExportTableToPointInTimeCommand({
TableArn: process.env.DDB_TABLE_ARN,
S3Bucket: process.env.S3_BUCKET_NAME,
S3Prefix: dayjs(fromTime * 1000).tz('Asia/Tokyo').format('YYYYMM/DD_HH0000'),
S3SseAlgorithm: 'KMS',
S3SseKmsKeyId: 'alias/aws/s3',
ExportFormat: 'DYNAMODB_JSON',
ExportType: 'INCREMENTAL_EXPORT',
IncrementalExportSpecification: {
ExportFromTime: dayjs(fromTime * 1000).toDate(),
ExportToTime: dayjs(event.time.substring(0, 13)).toDate(),
ExportViewType: 'NEW_AND_OLD_IMAGES',
},
}));
};
TerraformによるIaC
Lambda
特に設定上で難しいことはないが、この情報が重要であれば、dead_letter_config
でDLQを設定しておこう。
DLQからメールを送るなり、DLQ格納やイベントドロップのメトリクスを拾ってアラームを上げれば完璧だ。
※EventBridge Schedulerからの起動は非同期呼び出しになるため、EventBridge側ではなくてLambda側の設定が適用されるので注意。
スクリプト内で必要になる環境依存項目はちゃんと環境変数で外部から注入できるようにしておくのがセオリーだ。
特にS3バケットは必ず環境差分になるため、冪等性を高めるためにも外部注入可能にしておくべきだ。
data "archive_file" "lambda_eventtarget" {
type = "zip"
source_dir = "../scripts/dist"
output_path = "../outputs/lambda_function.zip"
}
resource "aws_lambda_function" "eventtarget" {
depends_on = [
aws_cloudwatch_log_group.lambda_eventtarget,
]
function_name = local.lambda_eventtarget_function_name
filename = data.archive_file.lambda_eventtarget.output_path
role = aws_iam_role.lambda_eventtarget.arn
handler = "index.handler"
source_code_hash = data.archive_file.lambda_eventtarget.output_base64sha256
runtime = "nodejs16.x"
memory_size = 128
timeout = 30
dead_letter_config {
target_arn = aws_sqs_queue.dlq.arn
}
environment {
variables = {
DDB_TABLE_ARN = aws_dynamodb_table.example.arn
S3_BUCKET_NAME = aws_s3_bucket.example.bucket
}
}
}
IAM(Lambda)
Lambdaに対して以下を付与しておこう。
dynamodb:ExportTableToPointInTime
だけで良いかと思いきや、実際にオブジェクトをPutするにあたり、s3
の権限の付与も必要なのが注意点だ。
resource "aws_iam_role" "lambda_eventtarget" {
name = local.iam_lambda_eventtarget_role_name
assume_role_policy = data.aws_iam_policy_document.lambda_eventtarget_assume.json
}
data "aws_iam_policy_document" "lambda_eventtarget_assume" {
statement {
effect = "Allow"
actions = [
"sts:AssumeRole",
]
principals {
type = "Service"
identifiers = [
"lambda.amazonaws.com",
]
}
}
}
resource "aws_iam_role_policy" "lambda_eventtarget_custom" {
name = local.iam_lambda_eventtarget_policy_name
role = aws_iam_role.lambda_eventtarget.name
policy = data.aws_iam_policy_document.lambda_eventtarget_custom.json
}
data "aws_iam_policy_document" "lambda_eventtarget_custom" {
statement {
effect = "Allow"
actions = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
]
resources = [
"*",
]
}
statement {
effect = "Allow"
actions = [
"dynamodb:ExportTableToPointInTime",
]
resources = [
aws_dynamodb_table.example.arn,
]
}
statement {
effect = "Allow"
actions = [
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl",
]
resources = [
"${aws_s3_bucket.example.arn}/*",
]
}
statement {
effect = "Allow"
actions = [
"sqs:SendMessage",
]
resources = [
aws_sqs_queue.dlq.arn,
]
}
}
EventBridge Scheduler
以下のように設定する。設定方法の詳細は以前の記事を参考にしていただきたい。
resource "aws_scheduler_schedule_group" "example" {
name = local.eventbridge_scheduler_group_name
}
resource "aws_scheduler_schedule" "example" {
name = local.eventbridge_scheduler_schedule_name
group_name = aws_scheduler_schedule_group.example.name
state = "ENABLED"
schedule_expression = "cron(30 * * * ? *)"
flexible_time_window {
mode = "OFF"
}
target {
arn = aws_lambda_function.eventtarget.arn
role_arn = aws_iam_role.eventbridge_scheduler.arn
}
}
IAM(EventBridge Scheduler)
Lambdaを起動できるように、lambda:InvokeFunction
を付与しておく。
resource "aws_iam_role" "eventbridge_scheduler" {
name = local.iam_eventbridge_role_name
assume_role_policy = data.aws_iam_policy_document.eventbridge_scheduler_assume.json
}
data "aws_iam_policy_document" "eventbridge_scheduler_assume" {
statement {
effect = "Allow"
actions = [
"sts:AssumeRole",
]
principals {
type = "Service"
identifiers = [
"scheduler.amazonaws.com",
]
}
}
}
resource "aws_iam_role_policy" "eventbridge_scheduler_custom" {
name = local.iam_eventbridge_policy_name
role = aws_iam_role.eventbridge_scheduler.name
policy = data.aws_iam_policy_document.eventbridge_scheduler_custom.json
}
data "aws_iam_policy_document" "eventbridge_scheduler_custom" {
statement {
effect = "Allow"
actions = [
"lambda:InvokeFunction",
]
resources = [
"*",
]
}
}
いざ、動かす!
設定を入れてしばらく動かしてみた結果が以下だ。
example-bucket/
├── 202309
│ ├── 30_200000
│ | └── AWSDynamoDB
│ | ├── 01696078291000-5fe6485b
│ | │ └── (中略)
│ | └── data
│ | └── (中略)
│ ├── 30_210000
│ | └── AWSDynamoDB
│ | ├── 01696079120000-b08969a5
│ | │ └── (中略)
│ | └── data
│ | └── (中略)
│ ├── 30_220000
│ | └── AWSDynamoDB
│ | ├── 01696080791000-0d84d28b
│ | │ └── (中略)
│ | └── data
│ | └── (中略)
│ └── 30_230000
│ └── AWSDynamoDB
│ ├── 01696087837000-1d9f59bb
│ │ └── (中略)
│ └── data
│ └── (中略)
└── 202310
├── 01_000000
│ └── AWSDynamoDB
│ ├── 01696091437000-ec972575
│ │ └── (中略)
| └── data
| └── (中略)
├── 01_010000
| └── AWSDynamoDB
│ ├── 01696091437000-ec972575
│ │ └── (中略)
| └── data
| └── (中略)
├── 01_020000
| └── AWSDynamoDB
│ ├── 01696091437000-ec972575
│ │ └── (中略)
| └── data
| └── (中略)
├── (中略)
└── 01_080000
└── AWSDynamoDB
├── 01696120237000-0f4bcd8b
│ ├── manifest-files.json
│ ├── manifest-files.md5
│ ├── manifest-summary.json
│ ├── manifest-summary.md5
│ └── _started
└── data
├── jkruwp23fq4jnft3wmeyn2l6w4.json.gz
├── mawnmul7mmzethvx7p23qgiwzi.json.gz
├── n7oizrevvu5v5j2hcflisdusny.json.gz
└── s56eakivnq5yte3fwxo4incp3m.json.gz
しっかり起動、取得できている。
これで、DynamoDBに発生した差分を継続的にエクスポートすることができるようになった!
Dead Letter Queueに格納された後の調査
なお、何かしらのエラーの起因でイベントドロップした場合のDLQでの情報は以下だけである。
これだけでは、いつの実行が失敗になったか分からないが、このRequestID
はLambdaのリクエストIDであるため、Lambdaの実行ログ上での紐付けが可能だ。
イベントのダンプから、23:30のスケジュールのスケジュールがエラーになってリトライオーバーしたということが調べられる。リカバリも、これに合わせたLambdaへのINPUTイベントのJSONを作れば復旧が可能だ。
増分エクスポートの出力でAmazon ION形式を扱う
スクリプトの以下の部分を変更することで、Amazon ION形式で出力することができるようになる。
const response = await client.send(new ExportTableToPointInTimeCommand({
TableArn: process.env.DDB_TABLE_ARN,
S3Bucket: process.env.S3_BUCKET_NAME,
S3Prefix: dayjs(fromTime * 1000).tz('Asia/Tokyo').format('YYYYMM/DD_HH0000'),
S3SseAlgorithm: 'KMS',
S3SseKmsKeyId: 'alias/aws/s3',
- ExportFormat: 'DYNAMODB_JSON',
+ ExportFormat: 'ION',
ExportType: 'INCREMENTAL_EXPORT',
IncrementalExportSpecification: {
ExportFromTime: dayjs(fromTime * 1000).toDate(),
ExportToTime: dayjs(event.time.substring(0, 13)).toDate(),
ExportViewType: 'NEW_AND_OLD_IMAGES',
},
}));
詳細はQLDB公式の開発者ガイドを参照してもらいたい。
端的に言うと、データ形式をフォーマットで表現することができるJSONのスーパーセット(ただしAWSの独自規格)だ。
データ形式がフォーマットに埋め込めるということは、DynamoDBのデータ形式である'S'とか'N'が不要になるということだ。
実際、ION形式で増分エクスポートすると、ファイルの1行(つまりはDynamoDBの1レコードの差分)は以下のように出力される。
$ion_1_0 {Record:{Keys:{id:"00000310"},Metadata:{WriteTimestampMicros:1696075268492555.},OldImage:{id:"00000310",request_id:"24601436300e7967679fe0c785586a1d",write_date:"2023-09-30 13:52:03.418714"},NewImage:{id:"00000310",request_id:"01d2bb6cb9fb5cc2743bdb46ffde1897",write_date:"2023-09-30 21:01:09.503804"}}}
整形すると以下のようになる。形式がない。これは見やすい。
$ion_1_0
{
Record:{
Keys:{
id:"00000310"
},
Metadata:{
WriteTimestampMicros:1696075268492555.
},
OldImage:{
id:"00000310",
request_id:"24601436300e7967679fe0c785586a1d",
write_date:"2023-09-30 13:52:03.418714"
},
NewImage:{
id:"00000310",
request_id:"01d2bb6cb9fb5cc2743bdb46ffde1897",
write_date:"2023-09-30 21:01:09.503804"
}
}
}
だが、残念ながら項目名をダブルクォートで囲っていないため、このままではjqで整形することができない。
仕方がないので、以下のようなスクリプトを作ってダウンコンバージョンしてJSONに変換して動かそう。
from io import BytesIO
from amazon.ion.json_encoder import IonToJSONEncoder
import sys
import json
import amazon.ion.simpleion as ion
args = sys.argv
del args[0]
for filename in args:
with open(filename, "rb") as filehandler:
for line in filehandler:
json_string = json.dumps(ion.load(BytesIO(line)), cls=IonToJSONEncoder)
print(json_string)
amazon.ionとjsonconversionは必要に応じてpip install
すれば動作する。
※ただし、jsonconversionはWSL2のubuntuではインストールができなかった。
これで、ファイルの出力結果をパイプでjqに食わせれば、以下のような結果を得ることができる。
{
"Record": {
"Keys": {
"id": "00000310"
},
"Metadata": {
"WriteTimestampMicros": 1696075268492555
},
"OldImage": {
"id": "00000310",
"request_id": "24601436300e7967679fe0c785586a1d",
"write_date": "2023-09-30 13:52:03.418714"
},
"NewImage": {
"id": "00000310",
"request_id": "01d2bb6cb9fb5cc2743bdb46ffde1897",
"write_date": "2023-09-30 21:01:09.503804"
}
}
}
これを活用すれば、DynamoDBの見づらい形式ともオサラバだ!