1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DynamoDBの変更データキャプチャを試してみた

Posted at

背景・目的

DynamoDB(以降、DDBといいます。)の変更データキャプチャ(CDC)について、触る機会があったので知識の整理と、簡単に試してみます。

まとめ

下記に特徴を整理します。

特徴 説明
概要 ・DDBでは、テーブルに保存された項目の変更を、変更の発生時にキャプチャできる
・CDCレコードのニアリアルタイムをサポートしている
ストリーミングオプション ・DDB用のKDS
・DynamoDB Streams

概要

Amazon DynamoDB の変更データキャプチャを元に整理します。

多くのアプリケーションでは、DynamoDB テーブルに保存された項目の変更を、変更の発生時にキャプチャすることで利点を利用できます。以下に示しているのは、いくつかのユースケースの例です。

  • 人気のモバイルアプリケーションは、1 秒あたり数千件の更新速度で、DynamoDB テーブルのデータを変更します。別のアプリケーションがこれらの更新に関するデータをキャプチャして保存し、 near-real-time モバイルアプリの使用状況メトリックを提供します。
  • 金融アプリケーションは、DynamoDB テーブル内の株式市場データを変更します。parallel 実行されるさまざまなアプリケーションが、これらの変化をリアルタイムで追跡し、株価の動きに基づいてポートフォリオを計算し value-at-risk、自動的に再調整します。
  • 輸送車両や産業機器のセンサーは、DynamoDB テーブルにデータを送信します。さまざまなアプリケーションがパフォーマンスをモニタリングし、問題が検出されたときにメッセージングアラートを送信し、機械学習アルゴリズムを適用して潜在的な欠陥を予測し、データを圧縮して Amazon Simple Storage Service (Amazon S3) にアーカイブします。
  • アプリケーションは、友人の 1 人が新しい画像をアップロードするとすぐに、グループ内のすべての友人のモバイルデバイスに通知を自動送信します。
  • 新しいお客様がデータを DynamoDB テーブルに追加します。このイベントにより、新しいお客様にようこそメールを送信する別のアプリケーションが起動されます。

DynamoDB は、項目レベルの変更データキャプチャレコードのストリーミングをほぼリアルタイムでサポートします。これらのストリーミングを使用し、内容に基づいてアクションを実行するアプリケーションを構築できます。

  • DDBでは、テーブルに保存された項目の変更を、変更の発生時にキャプチャできる
  • CDCレコードのニアリアルタイムをサポートしている

変更データキャプチャのストリーミングオプション

DDBには、2つのストリーミングもでるがあります。
下記に特徴をまとめます。

プロパティ DDB 用 KDS DDB Streams
データ保持期間 1年まで 24H
KCLサポート KCL 1.x、2.X KCL 1.X
コンシューマー数 シャードごとに最大5つの同時コンシューマー
または、ファンアウトが強化されたシャードごとに最大20の同時コンシューマ
シャードごとに最大2つの同時コンシューマ
スループットクォーター 無制限 DDBテーブルとリージョンごとのスループットクォーターの対象
レコードの配信モデル ファンアウト拡張でPush HTTP経由でPull
レコードの順序 各ストリーミングレコードのタイムスタンプ属性を使用して、DDB テーブルで変更が発生した実際の順序を特定できる。 DDBテーブルで変更された各項目について、ストリーミングレコードは項目に対する実際の変更と同じ順序で出現する
重複レコード 表示される場合がある 表示されることはない
処理オプション Lambda、Flink、KDF、Glueストリーミング Lambda、DDB Stream Kinesis adapter
耐久性 AZにより中断することなく、自動でF/Oできる 同左

実践

今回、このような構成で試してみます。
CDCのストリーミングオプションは、2つありますが、今回はKDSを使用します。

image.png

S3バケットの作成

  1. AWSにサインインします
  2. S3バケットを作成します

Kinesis Data Streamsの作成

  1. Kinesisのトップページに移動します

  2. 「Kinesis Data Stream」を選択し、データストリームを作成をクリックします

  3. データストリーム名を指定し、「データストリームの作成」をクリックします

DynamoDBの作成

テーブルの作成

以前、DynamoDBを試してみたでDDBを作成しましたが、同じようにテーブルを作成します。

  1. DDBのトップページに遷移します
  2. 「テーブルの作成」をクリックします
  3. 下記を入力し、「テーブルの作成」をクリックします
    • テーブル名
    • PKとソートキー
      image.png

ストリームの作成

今回の本命である、変更データキャプチャを設定します。

  1. 上記で作成したDDBテーブルを選択します

  2. 「エクスポートおよびストリーム」タブをクリックします
    image.png

  3. KDSの「オンにする」をクリックします
    image.png

  4. すでに作成したKDSを指定し、「ストリームをオンにする」をクリックします
    image.png

Data Firehoseの作成

  1. Kinesisのトップページに移動します

  2. 「Amazon Data Firehose 」を選択し、Firehoseストリームを作成をクリックします

  3. ソースと送信先を選択で、下記を選択します

    • ソース:Kinesis Data Streams
    • 送信先:S3
      image.png
  4. ソースの設定で、作成したKDSを指定します
    image.png

  5. Firehoseストリーム名を指定します
    image.png

  6. 送信先を設定します

    • S3バケット:作成したS3バケットのパス
    • タイムゾーン:Asia/Tokyo
      image.png
  7. バッファヒント等で、サイズは1MiB、バッファ間隔を150秒にしました。
    image.png

  8. 最後に「Firehoseストリームを作成」をクリックします

データの書き込み〜S3の確認

データの登録

  1. 作成したDDBテーブルを選択し、「項目を探索」をクリックします
    image.png

  2. 項目を作成をクリックします
    image.png

  3. 値を入力し、「項目を作成」をクリックします
    image.png

  4. 再度繰り返し、下記のようになりました
    image.png

KDSの確認

  1. 分かりづらいですが、レコードは受信したようです
    image.png

Firehouseの確認

  1. レコードは受信され、S3への配信も成功したようです
    image.png

S3の確認

  1. ファイルが到着していました
    image.png

  2. S3 Selectで表示してみます。下記のようなデータが表示できました。たしかにとうろく

    {
      "awsRegion": "ap-northeast-1",
      "eventID": "6d42f935-f293-40fb-bf3e-7ae9065dd5c6",
      "eventName": "INSERT",
      "userIdentity": null,
      "recordFormat": "application/json",
      "tableName": "streamtest",
      "dynamodb": {
        "ApproximateCreationDateTime": 1712969103175023,
        "Keys": {
          "id": {
            "S": "1"
          },
          "timestamp": {
            "S": "20240413094400"
          }
        },
        "NewImage": {
          "id": {
            "S": "1"
          },
          "timestamp": {
            "S": "20240413094400"
          }
        },
        "SizeBytes": 52,
        "ApproximateCreationDateTimePrecision": "MICROSECOND"
      },
      "eventSource": "aws:dynamodb"
    }
    {
      "awsRegion": "ap-northeast-1",
      "eventID": "e6d41715-de17-4599-bf22-d6216751651c",
      "eventName": "INSERT",
      "userIdentity": null,
      "recordFormat": "application/json",
      "tableName": "streamtest",
      "dynamodb": {
        "ApproximateCreationDateTime": 1712969128680631,
        "Keys": {
          "id": {
            "S": "2"
          },
          "timestamp": {
            "S": "202404130944500"
          }
        },
        "NewImage": {
          "id": {
            "S": "2"
          },
          "timestamp": {
            "S": "202404130944500"
          }
        },
        "SizeBytes": 54,
        "ApproximateCreationDateTimePrecision": "MICROSECOND"
      },
      "eventSource": "aws:dynamodb"
    }
    

考察

今回、DDBのCDCを使って、S3までデータを連携してみました。ニアリアルタイムで、S3に連携できるのはよいですね。
次回以降は、FirehoseのTransformについて試してみようと思います

参考

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?