2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Amazon DynamoDBの増分エクスポートを継続的に取得するプラクティス

Last updated at Posted at 2023-10-01

はじめに

Amazon DynamoDBの増分エクスポートの機能が9/26に発表された
これまではある時点の全件スナップショットしか取得できなかったが、「この期間に更新が入ったユーザ」を探せるようになったことで、Amazon DynamoDBでマスタ管理している際の差分履歴情報を別のテーブルで管理することが不要になった。

今回は、この増分エクスポートの仕様を確認しつつ、継続的に増分エクスポートを取得する方法を検討する。

なお、増分エクスポートはマネージメントコンソールからもCLIからも実行できるが、いずれもあくまでの1回限りの「指定した期間で差分抽出する」コマンドを実行するのみであり、継続的に取得する場合はAWS公式のブログでも言われている通り、EventBridgeとの連携が必要になる。

増分エクスポートの基本的な仕様

増分エクスポートの仕様は公式のAPIリファレンスを見るのが一番手っ取り早いだろう。

{
   "ClientToken": "string",
   "ExportFormat": "string",
   "ExportTime": number,
   "ExportType": "string",
   "IncrementalExportSpecification": { 
      "ExportFromTime": number,
      "ExportToTime": number,
      "ExportViewType": "string"
   },
   "S3Bucket": "string",
   "S3BucketOwner": "string",
   "S3Prefix": "string",
   "S3SseAlgorithm": "string",
   "S3SseKmsKeyId": "string",
   "TableArn": "string"
}

それぞれの項目の意味を、増分エクスポートに特化して書いておく。

プロパティ名 必須 意味
S3Bucket 出力先のS3バケット名
TableArn 抽出元のDynamoDBのARN
ClientToken レスポンス情報で同名の項目が返される。これを使うと、APIの呼び出しが冪等になるらしい。具体的なユースケースは未考察
ExportFormat DYNAMODB_JSONIONを指定
jqで結果を確認するならDYNAMODB_JSONにしておくのが無難
ExportTime 増分エクスポート時は指定しない
ExportType INCREMENTAL_EXPORTを指定
IncrementalExportSpecification 増分エクスポートの場合必須
    ExportFromTime 増分エクスポートの開始日時
この日時を含む
Unixタイムスタンプ型
テーブル作成日時よりも前を指定するとエラー
    ExportToTime 増分エクスポートの終了日時
この日時を含まない
Unixタイムスタンプ型
現在日時よりも後を指定するとエラー
    ExportViewType NEW_IMAGENEW_AND_OLD_IMAGESを指定
更新前の情報も欲しい場合はNEW_AND_OLD_IMAGESを指定
S3BucketOwner バケットを保持しているAWSアカウントのID
具体的なユースケースは未考察
自分のアカウントのS3の場合は指定しなくても動作する
S3Prefix 指定した場合にオブジェクトにプレフィックスを付与できる。"/"も指定できるので、日付で階層にする等の分類も可能
これで指定したプレフィックスのディレクトリ配下に、公式ブログの「File layout」に記載の構造でエクスポートが行われる
S3SseAlgorithm AES256KMSを指定
エクスポートのS3バケットに対して暗号化して出力する場合は設定する
S3SseKmsKeyId S3SseAlgorithmKMSの場合にキーIDを指定する
キーへのエイリアスでもOK

新規にPutItemする場合

新規にPutItemする場合、以下のようなJSONが出力される(ExportFormat: "DYNAMODB_JSON"の場合)。
実際にPutした情報は、id, request_id, write_dateだ。NewImageだけが出力される動作となる。

{
  "Metadata": {
    "WriteTimestampMicros": {
      "N": "1696050523828262"
    }
  },
  "Keys": {
    "id": {
      "S": "00001284"
    }
  },
  "NewImage": {
    "id": {
      "S": "00001284"
    },
    "request_id": {
      "S": "24601436300e7967679fe0c785586a1d"
    },
    "write_date": {
      "S": "2023-09-30 14:08:44.764063"
    }
  }
}

PutItemで上書き更新する場合

PutItemで上書き更新する場合は、以下のようになる。
OldImageで更新前情報が取得可能だ。

{
  "Metadata": {
    "WriteTimestampMicros": {
      "N": "1696050058135755"
    }
  },
  "Keys": {
    "id": {
      "S": "00000831"
    }
  },
  "OldImage": {
    "id": {
      "S": "00000831"
    },
    "request_id": {
      "S": "1856157bd7f5b4aa8dc3ccb9303b9615"
    },
    "write_date": {
      "S": "2023-09-30 04:58:07.067409"
    }
  },
  "NewImage": {
    "id": {
      "S": "00000831"
    },
    "request_id": {
      "S": "24601436300e7967679fe0c785586a1d"
    },
    "write_date": {
      "S": "2023-09-30 14:00:59.070395"
    }
  }
}

UpdateItemで上書き更新する場合

UpdateItemでレコードに対してtest_data2の項目を追加してみる。
この場合も、ちゃんとレコードの全体のスナップショットがOldImageにもNewImageにも出力される。
「この更新をしたときにこのレコードは全体としてどんな形だったか」はレコード全体のスナップショットとして出力されることが確認できる。


  "Metadata": {
    "WriteTimestampMicros": {
      "N": "1696080209015554"
    }
  },
  "Keys": {
    "id": {
      "S": "00000002"
    }
  },
  "OldImage": {
    "id": {
      "S": "00000002"
    },
    "request_id": {
      "S": "97dd14fabd3538a076d955f4097033af"
    },
    "test_data": {
      "S": "hogehoge"
    },
    "write_date": {
      "S": "2023-09-30 21:58:36.635478"
    }
  },
  "NewImage": {
    "id": {
      "S": "00000002"
    },
    "request_id": {
      "S": "97dd14fabd3538a076d955f4097033af"
    },
    "test_data": {
      "S": "hogehoge"
    },
    "test_data2": {
      "S": "higehige"
    },
    "write_date": {
      "S": "2023-09-30 21:58:36.635478"
    }
  }
}

増分エクスポートをEventBridgeから定期実行する

EventBridgeはSchedulerを使用しよう。
今回は、毎時30分に、前の時間の0分0秒から、59分59秒までを取得するように構築する。

スクリプト

TypeScriptで以下のように作成する。
今回は日時で取得するので、S3のプレフィックスは「年月/日付_時間」にする。
ディレクトリは日本時間の方が分かりやすいので、dayjsでタイムゾーンを設定しながら作っていこう。

import { EventBridgeEvent } from 'aws-lambda';
import { DynamoDBClient, ExportTableToPointInTimeCommand } from '@aws-sdk/client-dynamodb';

// Settings for Localtimezone
import dayjs from 'dayjs';
import timezone from 'dayjs/plugin/timezone';
import utc from 'dayjs/plugin/utc';

// Configuration for DayJS
dayjs.extend(timezone);
dayjs.extend(utc);

const client = new DynamoDBClient({});

export const handler = async (event: EventBridgeEvent<'Scheduled Event', any>): Promise<void> => {
  console.log(`Event: ${JSON.stringify(event, null, 2)}`);

  const toTime = dayjs(event.time.substring(0, 13)).unix();
  const fromTime = toTime - 3600;

  const response = await client.send(new ExportTableToPointInTimeCommand({
    TableArn: process.env.DDB_TABLE_ARN,
    S3Bucket: process.env.S3_BUCKET_NAME,
    S3Prefix: dayjs(fromTime * 1000).tz('Asia/Tokyo').format('YYYYMM/DD_HH0000'),
    S3SseAlgorithm: 'KMS',
    S3SseKmsKeyId: 'alias/aws/s3',
    ExportFormat: 'DYNAMODB_JSON',
    ExportType: 'INCREMENTAL_EXPORT',
    IncrementalExportSpecification: {
      ExportFromTime: dayjs(fromTime * 1000).toDate(),
      ExportToTime: dayjs(event.time.substring(0, 13)).toDate(),
      ExportViewType: 'NEW_AND_OLD_IMAGES',
    },
  }));
};

TerraformによるIaC

Lambda

特に設定上で難しいことはないが、この情報が重要であれば、dead_letter_configでDLQを設定しておこう。
DLQからメールを送るなり、DLQ格納やイベントドロップのメトリクスを拾ってアラームを上げれば完璧だ。
※EventBridge Schedulerからの起動は非同期呼び出しになるため、EventBridge側ではなくてLambda側の設定が適用されるので注意。

スクリプト内で必要になる環境依存項目はちゃんと環境変数で外部から注入できるようにしておくのがセオリーだ。
特にS3バケットは必ず環境差分になるため、冪等性を高めるためにも外部注入可能にしておくべきだ。

data "archive_file" "lambda_eventtarget" {
  type        = "zip"
  source_dir  = "../scripts/dist"
  output_path = "../outputs/lambda_function.zip"
}

resource "aws_lambda_function" "eventtarget" {
  depends_on = [
    aws_cloudwatch_log_group.lambda_eventtarget,
  ]

  function_name    = local.lambda_eventtarget_function_name
  filename         = data.archive_file.lambda_eventtarget.output_path
  role             = aws_iam_role.lambda_eventtarget.arn
  handler          = "index.handler"
  source_code_hash = data.archive_file.lambda_eventtarget.output_base64sha256
  runtime          = "nodejs16.x"

  memory_size = 128
  timeout     = 30

  dead_letter_config {
    target_arn = aws_sqs_queue.dlq.arn
  }

  environment {
    variables = {
      DDB_TABLE_ARN  = aws_dynamodb_table.example.arn
      S3_BUCKET_NAME = aws_s3_bucket.example.bucket
    }
  }
}

IAM(Lambda)

Lambdaに対して以下を付与しておこう。
dynamodb:ExportTableToPointInTimeだけで良いかと思いきや、実際にオブジェクトをPutするにあたり、s3の権限の付与も必要なのが注意点だ。

resource "aws_iam_role" "lambda_eventtarget" {
  name               = local.iam_lambda_eventtarget_role_name
  assume_role_policy = data.aws_iam_policy_document.lambda_eventtarget_assume.json
}

data "aws_iam_policy_document" "lambda_eventtarget_assume" {
  statement {
    effect = "Allow"

    actions = [
      "sts:AssumeRole",
    ]

    principals {
      type = "Service"
      identifiers = [
        "lambda.amazonaws.com",
      ]
    }
  }
}

resource "aws_iam_role_policy" "lambda_eventtarget_custom" {
  name   = local.iam_lambda_eventtarget_policy_name
  role   = aws_iam_role.lambda_eventtarget.name
  policy = data.aws_iam_policy_document.lambda_eventtarget_custom.json
}

data "aws_iam_policy_document" "lambda_eventtarget_custom" {
  statement {
    effect = "Allow"

    actions = [
      "logs:CreateLogGroup",
      "logs:CreateLogStream",
      "logs:PutLogEvents",
    ]

    resources = [
      "*",
    ]
  }
  statement {
    effect = "Allow"

    actions = [
      "dynamodb:ExportTableToPointInTime",
    ]

    resources = [
      aws_dynamodb_table.example.arn,
    ]
  }
  statement {
    effect = "Allow"

    actions = [
      "s3:AbortMultipartUpload",
      "s3:PutObject",
      "s3:PutObjectAcl",
    ]

    resources = [
      "${aws_s3_bucket.example.arn}/*",
    ]
  }
  statement {
    effect = "Allow"

    actions = [
      "sqs:SendMessage",
    ]

    resources = [
      aws_sqs_queue.dlq.arn,
    ]
  }
}

EventBridge Scheduler

以下のように設定する。設定方法の詳細は以前の記事を参考にしていただきたい

resource "aws_scheduler_schedule_group" "example" {
  name = local.eventbridge_scheduler_group_name
}

resource "aws_scheduler_schedule" "example" {
  name       = local.eventbridge_scheduler_schedule_name
  group_name = aws_scheduler_schedule_group.example.name

  state = "ENABLED"

  schedule_expression = "cron(30 * * * ? *)"

  flexible_time_window {
    mode = "OFF"
  }

  target {
    arn      = aws_lambda_function.eventtarget.arn
    role_arn = aws_iam_role.eventbridge_scheduler.arn
  }
}

IAM(EventBridge Scheduler)

Lambdaを起動できるように、lambda:InvokeFunctionを付与しておく。

resource "aws_iam_role" "eventbridge_scheduler" {
  name               = local.iam_eventbridge_role_name
  assume_role_policy = data.aws_iam_policy_document.eventbridge_scheduler_assume.json
}

data "aws_iam_policy_document" "eventbridge_scheduler_assume" {
  statement {
    effect = "Allow"

    actions = [
      "sts:AssumeRole",
    ]

    principals {
      type = "Service"
      identifiers = [
        "scheduler.amazonaws.com",
      ]
    }
  }
}

resource "aws_iam_role_policy" "eventbridge_scheduler_custom" {
  name   = local.iam_eventbridge_policy_name
  role   = aws_iam_role.eventbridge_scheduler.name
  policy = data.aws_iam_policy_document.eventbridge_scheduler_custom.json
}

data "aws_iam_policy_document" "eventbridge_scheduler_custom" {
  statement {
    effect = "Allow"

    actions = [
      "lambda:InvokeFunction",
    ]

    resources = [
      "*",
    ]
  }
}

いざ、動かす!

設定を入れてしばらく動かしてみた結果が以下だ。

example-bucket/
├── 202309
│   ├── 30_200000
│   |   └── AWSDynamoDB
│   |       ├── 01696078291000-5fe6485b
│   |       │   └── (中略)
│   |       └── data
│   |           └── (中略)
│   ├── 30_210000
│   |   └── AWSDynamoDB
│   |       ├── 01696079120000-b08969a5
│   |       │   └── (中略)
│   |       └── data
│   |           └── (中略)
│   ├── 30_220000
│   |   └── AWSDynamoDB
│   |       ├── 01696080791000-0d84d28b
│   |       │   └── (中略)
│   |       └── data
│   |           └── (中略)
│   └── 30_230000
│       └── AWSDynamoDB
│           ├── 01696087837000-1d9f59bb
│           │   └── (中略)
│           └── data
│               └── (中略)
└── 202310
    ├── 01_000000
    │   └── AWSDynamoDB
    │       ├── 01696091437000-ec972575
    │       │   └── (中略)
    |       └── data
    |           └── (中略)
    ├── 01_010000
    |   └── AWSDynamoDB
    │       ├── 01696091437000-ec972575
    │       │   └── (中略)
    |       └── data
    |           └── (中略)
    ├── 01_020000
    |   └── AWSDynamoDB
    │       ├── 01696091437000-ec972575
    │       │   └── (中略)
    |       └── data
    |           └── (中略)
    ├── (中略)
    └── 01_080000
        └── AWSDynamoDB
            ├── 01696120237000-0f4bcd8b
            │   ├── manifest-files.json
            │   ├── manifest-files.md5
            │   ├── manifest-summary.json
            │   ├── manifest-summary.md5
            │   └── _started
            └── data
                ├── jkruwp23fq4jnft3wmeyn2l6w4.json.gz
                ├── mawnmul7mmzethvx7p23qgiwzi.json.gz
                ├── n7oizrevvu5v5j2hcflisdusny.json.gz
                └── s56eakivnq5yte3fwxo4incp3m.json.gz

しっかり起動、取得できている。
これで、DynamoDBに発生した差分を継続的にエクスポートすることができるようになった!

Dead Letter Queueに格納された後の調査

なお、何かしらのエラーの起因でイベントドロップした場合のDLQでの情報は以下だけである。

image.png

これだけでは、いつの実行が失敗になったか分からないが、このRequestIDはLambdaのリクエストIDであるため、Lambdaの実行ログ上での紐付けが可能だ。

image.png

イベントのダンプから、23:30のスケジュールのスケジュールがエラーになってリトライオーバーしたということが調べられる。リカバリも、これに合わせたLambdaへのINPUTイベントのJSONを作れば復旧が可能だ。

増分エクスポートの出力でAmazon ION形式を扱う

スクリプトの以下の部分を変更することで、Amazon ION形式で出力することができるようになる。

  const response = await client.send(new ExportTableToPointInTimeCommand({
    TableArn: process.env.DDB_TABLE_ARN,
    S3Bucket: process.env.S3_BUCKET_NAME,
    S3Prefix: dayjs(fromTime * 1000).tz('Asia/Tokyo').format('YYYYMM/DD_HH0000'),
    S3SseAlgorithm: 'KMS',
    S3SseKmsKeyId: 'alias/aws/s3',
-    ExportFormat: 'DYNAMODB_JSON',
+    ExportFormat: 'ION',
    ExportType: 'INCREMENTAL_EXPORT',
    IncrementalExportSpecification: {
      ExportFromTime: dayjs(fromTime * 1000).toDate(),
      ExportToTime: dayjs(event.time.substring(0, 13)).toDate(),
      ExportViewType: 'NEW_AND_OLD_IMAGES',
    },
  }));

詳細はQLDB公式の開発者ガイドを参照してもらいたい。
端的に言うと、データ形式をフォーマットで表現することができるJSONのスーパーセット(ただしAWSの独自規格)だ。

データ形式がフォーマットに埋め込めるということは、DynamoDBのデータ形式である'S'とか'N'が不要になるということだ。

実際、ION形式で増分エクスポートすると、ファイルの1行(つまりはDynamoDBの1レコードの差分)は以下のように出力される。

$ion_1_0 {Record:{Keys:{id:"00000310"},Metadata:{WriteTimestampMicros:1696075268492555.},OldImage:{id:"00000310",request_id:"24601436300e7967679fe0c785586a1d",write_date:"2023-09-30 13:52:03.418714"},NewImage:{id:"00000310",request_id:"01d2bb6cb9fb5cc2743bdb46ffde1897",write_date:"2023-09-30 21:01:09.503804"}}}

整形すると以下のようになる。形式がない。これは見やすい。

$ion_1_0
{
  Record:{
    Keys:{
      id:"00000310"
    },
    Metadata:{
      WriteTimestampMicros:1696075268492555.
    },
    OldImage:{
      id:"00000310",
      request_id:"24601436300e7967679fe0c785586a1d",
      write_date:"2023-09-30 13:52:03.418714"
    },
    NewImage:{
      id:"00000310",
      request_id:"01d2bb6cb9fb5cc2743bdb46ffde1897",
      write_date:"2023-09-30 21:01:09.503804"
    }
  }
}

だが、残念ながら項目名をダブルクォートで囲っていないため、このままではjqで整形することができない。

仕方がないので、以下のようなスクリプトを作ってダウンコンバージョンしてJSONに変換して動かそう。

ion2json.py
from io import BytesIO
from amazon.ion.json_encoder import IonToJSONEncoder
import sys
import json
import amazon.ion.simpleion as ion

args = sys.argv
del args[0]

for filename in args:
  with open(filename, "rb") as filehandler:
    for line in filehandler:
      json_string = json.dumps(ion.load(BytesIO(line)), cls=IonToJSONEncoder)
      print(json_string)

amazon.ionとjsonconversionは必要に応じてpip installすれば動作する。
※ただし、jsonconversionはWSL2のubuntuではインストールができなかった。

これで、ファイルの出力結果をパイプでjqに食わせれば、以下のような結果を得ることができる。

{
  "Record": {
    "Keys": {
      "id": "00000310"
    },
    "Metadata": {
      "WriteTimestampMicros": 1696075268492555
    },
    "OldImage": {
      "id": "00000310",
      "request_id": "24601436300e7967679fe0c785586a1d",
      "write_date": "2023-09-30 13:52:03.418714"
    },
    "NewImage": {
      "id": "00000310",
      "request_id": "01d2bb6cb9fb5cc2743bdb46ffde1897",
      "write_date": "2023-09-30 21:01:09.503804"
    }
  }
}

これを活用すれば、DynamoDBの見づらい形式ともオサラバだ!

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?