5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

CDKでDynamoDBのデータをAthenaで分析する環境をつくる

Posted at

はじめに

こんにちは。
本日は、CDKでDynamoDBのデータをAthenaで分析する環境の作り方について紹介していきます。

DynamoDBは高パフォーマンスでフルマネージドなキー・バリューデータベースですが、その性質上、検索については一括スキャン、もしくは、インデックスに基づく検索(最大5種類)とその絞り込みくらいしかできず、データの集計を行うにはわりと苦労します。
昨年より、DynamoDBにエクスポート機能が搭載されまして、データベースの本番稼働に負荷をかけることなく、S3にデータをエクスポートできるようになりました。

本日は、このエクスポート機能とサーバレスなクエリサービスであるAthenaを用いて、データの集計をできるような環境をCDKで構築する方法について紹介したいと思います。

今回は、以下のような簡易的なデータがDynamoDBのテーブルに入っていることを想定します。

id name createdAt
1 Alice 2021-09-01T02:00:00.000Z
2 Bob 2021-09-02T10:00:00.000Z
... ... ...

エクスポート元のDynamoDBのテーブルを作成する

まずは、DynamoDBのテーブルを作成します。これはすでにあるテーブルでも問題ありません。
エクスポート機能を利用するには、PITR(ポイントインタイムリカバリ)を有効にする必要があります。

lib/demo-stack.ts
import * as cdk from '@aws-cdk/core';
import * as ddb from '@aws-cdk/aws-dynamodb';

export class DemoStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props: cdk.StackProps) {
    super(scope, id, props);

    // データ用のDynamoDBテーブル
    const dynamodbTable = new ddb.Table(this, 'DataTable', {
      partitionKey: {
        name: 'id',
        type: ddb.AttributeType.STRING,
      },
      pointInTimeRecovery: true, // 必須
    });
  }
}

エクスポート先のS3バケットを作成する

次に、DynamoDBからエクスポートしたデータを保存するS3バケットを作成します。
バケットの名前はなんでも大丈夫です。

lib/demo-stack.ts
import * as cdk from '@aws-cdk/core';
import * as ddb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';

export class DemoStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props: cdk.StackProps) {
    super(scope, id, props);

    ...

    // エクスポート用のS3バケット
    const bucket = new s3.Bucket(this, 'ExportBucket', {
      bucketName: 'demo-export-bucket',
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
    });
  }
}

エクスポートしたデータをAthenaからクエリを行うための設定をする

次に、エクスポートしたデータに対してAthenaを用いてクエリを行えるようにしていきます。
現在では、Athenaのデータベースやテーブル設定は、AWS Glueというサービスに統合されています。
そのため、Athenaでクエリを行うためには、AWS Glueにてデータベースとテーブルの設定を行っていきます。

DynamoDBのエクスポート結果は、以下のようなJSONデータになっています。

{ "Item": { "id": { "S": "1" }, "name": { "S": "Alice" }, "createdAt": { "S": "2021-09-01T02:00:00.000Z" } } }
{ "Item": { "id": { "S": "2" }, "name": { "S": "Bob" }, "createdAt": { "S": "2021-09-02T10:00:00.000Z" } } }

そのため、Athenaからクエリを行う際は、SELECT nameではなく、SELECT Item.name.Sという形になります。
よって、そのようなクエリが行えるよう、Glueのテーブル設定を行います。
S3のPrefixには、DynamoDBのエクスポートのルールに従い、ダミーで設定しています。

lib/demo-stack.ts
import * as cdk from '@aws-cdk/core';
import * as ddb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';
import * as glue from '@aws-cdk/aws-glue';

export class DemoStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props: cdk.StackProps) {
    super(scope, id, props);

    ...

    // Athena用のデータベースとテーブルの設定
    const database = new glue.Database(this, 'DemoDatabase', {
      databaseName: 'demo-db',
    });
    const table = new glue.Table(this, 'DemoUserTable', {
      bucket,
      s3Prefix: 'AWSDynamoDB/12345678901234-12345678/data/',
      database,
      tableName: 'user',
      columns: [
        {
          name: 'item',
          type: glue.Schema.struct([
            {
              name: 'id',
              type: glue.Schema.struct([{
                name: 'S',
                type: glue.Schema.STRING,
              }]),
            },
            {
              name: 'name',
              type: glue.Schema.struct([{
                name: 'S',
                type: glue.Schema.STRING,
              }]),
            },
            {
              name: 'createdAt',
              type: glue.Schema.struct([{
                name: 'S',
                type: glue.Schema.STRING,
              }]),
            },
          ])
        }
      ],
      dataFormat: glue.DataFormat.JSON,
    });
  }
}

エクスポートするためのLambda関数を作成する

次に、DynamoDBからS3へとエクスポートを行う処理を記述します。
この際、単にエクスポートするだけではGlueに設定したテーブル設定が、最新のエクスポート先に設定されていないため、最新のデータを取得できなくなります。
よって、エクスポートだけではなく、Glueのテーブル設定の変更も同時に行います。

まずは、CDKでLambda Functionを作成します。今回はNode.jsを使用しています。
同時に、Lambda Functionで利用する環境変数や権限も設定しています。

ざっくりいうと、以下のような権限が必要となります。

  • エクスポート元となるDynamoDBのEXPORT権限
  • エクスポート先となるS3バケットへのPUT権限
  • 対象となるGlueのテーブルのREAD、UPDATE権限
lib/demo-stack.ts
import * as cdk from '@aws-cdk/core';
import * as ddb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';
import * as glue from '@aws-cdk/aws-glue';
import * as lambda from '@aws-cdk/aws-lambda';
import * as lambdaNode from '@aws-cdk/aws-lambda-nodejs';
import * as iam from '@aws-cdk/aws-iam';
import { Duration } from '@aws-cdk/core';

export class DemoStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props: cdk.StackProps) {
    super(scope, id, props);

    // Lambda Functionの作成
    const lambdaFunction = new lambdaNode.NodejsFunction(this, 'DataExportFunction', {
      runtime: lambda.Runtime.NODEJS_14_X,
      entry: 'lib/resources/handler.ts',
      handler: 'handler',
      memorySize: 128,
      timeout: Duration.minutes(5),
    });
    lambdaFunction.addEnvironment('DYNAMO_TABLE_ARN', dynamodbTable.tableArn);
    lambdaFunction.addEnvironment('EXPORT_S3_BUCKET', bucket.bucketName);
    lambdaFunction.addEnvironment('GLUE_DATABASE_NAME', database.databaseName);
    lambdaFunction.addEnvironment('GLUE_TABLE_NAME', table.tableName);
    lambdaFunction.addToRolePolicy(new iam.PolicyStatement({
      actions: ['dynamodb:ExportTableToPointInTime'],
      resources: [
        dynamodbTable.tableArn,
      ],
      effect: iam.Effect.ALLOW,
    }));
    bucket.grantPut(lambdaFunction);
    lambdaFunction.addToRolePolicy(new iam.PolicyStatement({
      actions: ['glue:GetTable', 'glue:UpdateTable'],
      resources: [
        database.catalogArn,
        database.databaseArn,
        table.tableArn,
      ],
      effect: iam.Effect.ALLOW,
    }));
  }
}

そして、以下がエクスポートとGlueのテーブルのアップデートを行うための処理となります。
現在、GlueではテーブルのターゲットとなるS3のロケーションのみを変更するような操作はありません。
よって、テーブル設定をアップデートする際に、ロケーション以外のテーブル設定も同時に上書きする必要があります。
(ロケーションの変更はだいぶ無理やりです。笑)

lib/resources/handler.ts
import { Glue, DynamoDB } from 'aws-sdk';

const handler = async (event: any, context: any, callback: any) => {
  // エクスポート
  const dynamoClient = new DynamoDB();
  const exportConfig = await dynamoClient.exportTableToPointInTime({
    TableArn: process.env.DYNAMO_TABLE_ARN!,
    S3Bucket: process.env.EXPORT_S3_BUCKET!,
    ExportFormat: 'DYNAMODB_JSON',
  }).promise();

  // テーブル設定変更
  const glue = new Glue();
  const tableConfig = await glue.getTable({
    DatabaseName: process.env.GLUE_DATABASE_NAME!,
    Name: process.env.GLUE_TABLE_NAME!,
  }).promise();

  const id = exportConfig.ExportDescription?.ExportArn?.split('/').slice(-1)[0];
  const newLocation = tableConfig.Table?.StorageDescriptor?.Location?.replace(/[0-9a-z]+-[0-9a-z]+\/data\//, `${id}/data/`);
  await glue.updateTable({
    DatabaseName: tableConfig.Table?.DatabaseName!,
    TableInput: {
      Name: tableConfig.Table?.Name!,
      StorageDescriptor: {
        ...tableConfig.Table?.StorageDescriptor,
        Location: newLocation,
      },
      PartitionKeys: tableConfig.Table?.PartitionKeys,
      TableType: tableConfig.Table?.TableType,
      Parameters: tableConfig.Table?.Parameters,
    }
  }).promise();
};

定期的にエクスポートするためのスケジュール設定をする

最後に、先ほど作成したLambda FunctionをCloudWatch Eventsを用いて、定期的に実行することで、最新のデータをAthenaからクエリできるようにしていきます。
今回の例では、1時間に1回更新するように設定しています。

lib/demo-stack.ts
import * as cdk from '@aws-cdk/core';
import * as ddb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';
import * as glue from '@aws-cdk/aws-glue';
import * as lambda from '@aws-cdk/aws-lambda';
import * as lambdaNode from '@aws-cdk/aws-lambda-nodejs';
import * as iam from '@aws-cdk/aws-iam';
import { Duration } from '@aws-cdk/core';
import * as events from '@aws-cdk/aws-events';
import * as eventsTargets from '@aws-cdk/aws-events-targets';

export class DemoStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props: cdk.StackProps) {
    super(scope, id, props);

    ...

    // スケジュール設定
    const cronEvent = new events.Rule(this, 'DataExportCronEvent', {
      schedule: events.Schedule.cron({
        minute: '0',
        hour: '*/1',
        day: '*',
        month: '*',
        year: '*',
      }),
      targets: [new eventsTargets.LambdaFunction(lambdaFunction)],
    });
  }
}

全体

ここまでをまとめたコードになります。

lib/demo-stack.ts
import * as cdk from '@aws-cdk/core';
import * as ddb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';
import * as glue from '@aws-cdk/aws-glue';
import * as lambda from '@aws-cdk/aws-lambda';
import * as lambdaNode from '@aws-cdk/aws-lambda-nodejs';
import * as iam from '@aws-cdk/aws-iam';
import { Duration } from '@aws-cdk/core';
import * as events from '@aws-cdk/aws-events';
import * as eventsTargets from '@aws-cdk/aws-events-targets';

export class DemoStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props: cdk.StackProps) {
    super(scope, id, props);

    // データ用のDynamoDBテーブル
    const dynamodbTable = new ddb.Table(this, 'DataTable', {
      partitionKey: {
        name: 'id',
        type: ddb.AttributeType.STRING,
      },
      pointInTimeRecovery: true, // 必須
    });

    // エクスポート用のS3バケット
    const bucket = new s3.Bucket(this, 'ExportBucket', {
      bucketName: 'demo-export-bucket',
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
    });

    // Athena用のデータベースとテーブルの設定
    const database = new glue.Database(this, 'DemoDatabase', {
      databaseName: 'demo-db',
    });
    const table = new glue.Table(this, 'DemoUserTable', {
      bucket,
      s3Prefix: 'AWSDynamoDB/12345678901234-12345678/data/',
      database,
      tableName: 'user',
      columns: [
        {
          name: 'item',
          type: glue.Schema.struct([
            {
              name: 'id',
              type: glue.Schema.struct([{
                name: 'S',
                type: glue.Schema.STRING,
              }]),
            },
            {
              name: 'name',
              type: glue.Schema.struct([{
                name: 'S',
                type: glue.Schema.STRING,
              }]),
            },
            {
              name: 'createdAt',
              type: glue.Schema.struct([{
                name: 'S',
                type: glue.Schema.STRING,
              }]),
            },
          ])
        }
      ],
      dataFormat: glue.DataFormat.JSON,
    });

    // Lambda Functionの作成
    const lambdaFunction = new lambdaNode.NodejsFunction(this, 'DataExportFunction', {
      runtime: lambda.Runtime.NODEJS_14_X,
      entry: 'lib/resources/handler.ts',
      handler: 'handler',
      memorySize: 128,
      timeout: Duration.minutes(5),
    });
    lambdaFunction.addEnvironment('DYNAMO_TABLE_ARN', dynamodbTable.tableArn);
    lambdaFunction.addEnvironment('EXPORT_S3_BUCKET', bucket.bucketName);
    lambdaFunction.addEnvironment('GLUE_DATABASE_NAME', database.databaseName);
    lambdaFunction.addEnvironment('GLUE_TABLE_NAME', table.tableName);
    lambdaFunction.addToRolePolicy(new iam.PolicyStatement({
      actions: ['dynamodb:ExportTableToPointInTime'],
      resources: [
        dynamodbTable.tableArn,
      ],
      effect: iam.Effect.ALLOW,
    }));
    bucket.grantPut(lambdaFunction);
    lambdaFunction.addToRolePolicy(new iam.PolicyStatement({
      actions: ['glue:GetTable', 'glue:UpdateTable'],
      resources: [
        database.catalogArn,
        database.databaseArn,
        table.tableArn,
      ],
      effect: iam.Effect.ALLOW,
    }));

    // スケジュール設定
    const cronEvent = new events.Rule(this, 'DataExportCronEvent', {
      schedule: events.Schedule.cron({
        minute: '0',
        hour: '*/1',
        day: '*',
        month: '*',
        year: '*',
      }),
      targets: [new eventsTargets.LambdaFunction(lambdaFunction)],
    });
  }
}

Athenaでクエリを行う

最後に、Athenaでクエリを行ってみましょう。
例えばこんな感じで、日ごとの登録者数を見ることができます。
(日本のタイムゾーンでの日付で集計したいため、一部キャスト処理を行っています。)

SELECT
  COUNT(1),
  CAST(from_iso8601_timestamp(Item.createdAt.S) AT TIME ZONE 'Japan' AS DATE) as createdAt
FROM "demo-db"."user"
GROUP BY CAST(from_iso8601_timestamp(Item.createdAt.S) AT TIME ZONE 'Japan' AS DATE)

おわりに

今回は、CDKでDynamoDBのデータをAthenaで分析する環境を作ってみました。
もっといいやり方もあるかも知れませんので、ご存知の方はコメントなどいただけると嬉しいです。

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?