はじめに
こんにちは。
本日は、CDKでDynamoDBのデータをAthenaで分析する環境の作り方について紹介していきます。
DynamoDBは高パフォーマンスでフルマネージドなキー・バリューデータベースですが、その性質上、検索については一括スキャン、もしくは、インデックスに基づく検索(最大5種類)とその絞り込みくらいしかできず、データの集計を行うにはわりと苦労します。
昨年より、DynamoDBにエクスポート機能が搭載されまして、データベースの本番稼働に負荷をかけることなく、S3にデータをエクスポートできるようになりました。
本日は、このエクスポート機能とサーバレスなクエリサービスであるAthenaを用いて、データの集計をできるような環境をCDKで構築する方法について紹介したいと思います。
今回は、以下のような簡易的なデータがDynamoDBのテーブルに入っていることを想定します。
id | name | createdAt |
---|---|---|
1 | Alice | 2021-09-01T02:00:00.000Z |
2 | Bob | 2021-09-02T10:00:00.000Z |
... | ... | ... |
エクスポート元のDynamoDBのテーブルを作成する
まずは、DynamoDBのテーブルを作成します。これはすでにあるテーブルでも問題ありません。
エクスポート機能を利用するには、PITR(ポイントインタイムリカバリ)を有効にする必要があります。
import * as cdk from '@aws-cdk/core';
import * as ddb from '@aws-cdk/aws-dynamodb';
export class DemoStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);
// データ用のDynamoDBテーブル
const dynamodbTable = new ddb.Table(this, 'DataTable', {
partitionKey: {
name: 'id',
type: ddb.AttributeType.STRING,
},
pointInTimeRecovery: true, // 必須
});
}
}
エクスポート先のS3バケットを作成する
次に、DynamoDBからエクスポートしたデータを保存するS3バケットを作成します。
バケットの名前はなんでも大丈夫です。
import * as cdk from '@aws-cdk/core';
import * as ddb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';
export class DemoStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);
...
// エクスポート用のS3バケット
const bucket = new s3.Bucket(this, 'ExportBucket', {
bucketName: 'demo-export-bucket',
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
});
}
}
エクスポートしたデータをAthenaからクエリを行うための設定をする
次に、エクスポートしたデータに対してAthenaを用いてクエリを行えるようにしていきます。
現在では、Athenaのデータベースやテーブル設定は、AWS Glueというサービスに統合されています。
そのため、Athenaでクエリを行うためには、AWS Glueにてデータベースとテーブルの設定を行っていきます。
DynamoDBのエクスポート結果は、以下のようなJSONデータになっています。
{ "Item": { "id": { "S": "1" }, "name": { "S": "Alice" }, "createdAt": { "S": "2021-09-01T02:00:00.000Z" } } }
{ "Item": { "id": { "S": "2" }, "name": { "S": "Bob" }, "createdAt": { "S": "2021-09-02T10:00:00.000Z" } } }
そのため、Athenaからクエリを行う際は、SELECT name
ではなく、SELECT Item.name.S
という形になります。
よって、そのようなクエリが行えるよう、Glueのテーブル設定を行います。
S3のPrefixには、DynamoDBのエクスポートのルールに従い、ダミーで設定しています。
import * as cdk from '@aws-cdk/core';
import * as ddb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';
import * as glue from '@aws-cdk/aws-glue';
export class DemoStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);
...
// Athena用のデータベースとテーブルの設定
const database = new glue.Database(this, 'DemoDatabase', {
databaseName: 'demo-db',
});
const table = new glue.Table(this, 'DemoUserTable', {
bucket,
s3Prefix: 'AWSDynamoDB/12345678901234-12345678/data/',
database,
tableName: 'user',
columns: [
{
name: 'item',
type: glue.Schema.struct([
{
name: 'id',
type: glue.Schema.struct([{
name: 'S',
type: glue.Schema.STRING,
}]),
},
{
name: 'name',
type: glue.Schema.struct([{
name: 'S',
type: glue.Schema.STRING,
}]),
},
{
name: 'createdAt',
type: glue.Schema.struct([{
name: 'S',
type: glue.Schema.STRING,
}]),
},
])
}
],
dataFormat: glue.DataFormat.JSON,
});
}
}
エクスポートするためのLambda関数を作成する
次に、DynamoDBからS3へとエクスポートを行う処理を記述します。
この際、単にエクスポートするだけではGlueに設定したテーブル設定が、最新のエクスポート先に設定されていないため、最新のデータを取得できなくなります。
よって、エクスポートだけではなく、Glueのテーブル設定の変更も同時に行います。
まずは、CDKでLambda Functionを作成します。今回はNode.jsを使用しています。
同時に、Lambda Functionで利用する環境変数や権限も設定しています。
ざっくりいうと、以下のような権限が必要となります。
- エクスポート元となるDynamoDBのEXPORT権限
- エクスポート先となるS3バケットへのPUT権限
- 対象となるGlueのテーブルのREAD、UPDATE権限
import * as cdk from '@aws-cdk/core';
import * as ddb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';
import * as glue from '@aws-cdk/aws-glue';
import * as lambda from '@aws-cdk/aws-lambda';
import * as lambdaNode from '@aws-cdk/aws-lambda-nodejs';
import * as iam from '@aws-cdk/aws-iam';
import { Duration } from '@aws-cdk/core';
export class DemoStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);
// Lambda Functionの作成
const lambdaFunction = new lambdaNode.NodejsFunction(this, 'DataExportFunction', {
runtime: lambda.Runtime.NODEJS_14_X,
entry: 'lib/resources/handler.ts',
handler: 'handler',
memorySize: 128,
timeout: Duration.minutes(5),
});
lambdaFunction.addEnvironment('DYNAMO_TABLE_ARN', dynamodbTable.tableArn);
lambdaFunction.addEnvironment('EXPORT_S3_BUCKET', bucket.bucketName);
lambdaFunction.addEnvironment('GLUE_DATABASE_NAME', database.databaseName);
lambdaFunction.addEnvironment('GLUE_TABLE_NAME', table.tableName);
lambdaFunction.addToRolePolicy(new iam.PolicyStatement({
actions: ['dynamodb:ExportTableToPointInTime'],
resources: [
dynamodbTable.tableArn,
],
effect: iam.Effect.ALLOW,
}));
bucket.grantPut(lambdaFunction);
lambdaFunction.addToRolePolicy(new iam.PolicyStatement({
actions: ['glue:GetTable', 'glue:UpdateTable'],
resources: [
database.catalogArn,
database.databaseArn,
table.tableArn,
],
effect: iam.Effect.ALLOW,
}));
}
}
そして、以下がエクスポートとGlueのテーブルのアップデートを行うための処理となります。
現在、GlueではテーブルのターゲットとなるS3のロケーションのみを変更するような操作はありません。
よって、テーブル設定をアップデートする際に、ロケーション以外のテーブル設定も同時に上書きする必要があります。
(ロケーションの変更はだいぶ無理やりです。笑)
import { Glue, DynamoDB } from 'aws-sdk';
const handler = async (event: any, context: any, callback: any) => {
// エクスポート
const dynamoClient = new DynamoDB();
const exportConfig = await dynamoClient.exportTableToPointInTime({
TableArn: process.env.DYNAMO_TABLE_ARN!,
S3Bucket: process.env.EXPORT_S3_BUCKET!,
ExportFormat: 'DYNAMODB_JSON',
}).promise();
// テーブル設定変更
const glue = new Glue();
const tableConfig = await glue.getTable({
DatabaseName: process.env.GLUE_DATABASE_NAME!,
Name: process.env.GLUE_TABLE_NAME!,
}).promise();
const id = exportConfig.ExportDescription?.ExportArn?.split('/').slice(-1)[0];
const newLocation = tableConfig.Table?.StorageDescriptor?.Location?.replace(/[0-9a-z]+-[0-9a-z]+\/data\//, `${id}/data/`);
await glue.updateTable({
DatabaseName: tableConfig.Table?.DatabaseName!,
TableInput: {
Name: tableConfig.Table?.Name!,
StorageDescriptor: {
...tableConfig.Table?.StorageDescriptor,
Location: newLocation,
},
PartitionKeys: tableConfig.Table?.PartitionKeys,
TableType: tableConfig.Table?.TableType,
Parameters: tableConfig.Table?.Parameters,
}
}).promise();
};
定期的にエクスポートするためのスケジュール設定をする
最後に、先ほど作成したLambda FunctionをCloudWatch Eventsを用いて、定期的に実行することで、最新のデータをAthenaからクエリできるようにしていきます。
今回の例では、1時間に1回更新するように設定しています。
import * as cdk from '@aws-cdk/core';
import * as ddb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';
import * as glue from '@aws-cdk/aws-glue';
import * as lambda from '@aws-cdk/aws-lambda';
import * as lambdaNode from '@aws-cdk/aws-lambda-nodejs';
import * as iam from '@aws-cdk/aws-iam';
import { Duration } from '@aws-cdk/core';
import * as events from '@aws-cdk/aws-events';
import * as eventsTargets from '@aws-cdk/aws-events-targets';
export class DemoStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);
...
// スケジュール設定
const cronEvent = new events.Rule(this, 'DataExportCronEvent', {
schedule: events.Schedule.cron({
minute: '0',
hour: '*/1',
day: '*',
month: '*',
year: '*',
}),
targets: [new eventsTargets.LambdaFunction(lambdaFunction)],
});
}
}
全体
ここまでをまとめたコードになります。
import * as cdk from '@aws-cdk/core';
import * as ddb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';
import * as glue from '@aws-cdk/aws-glue';
import * as lambda from '@aws-cdk/aws-lambda';
import * as lambdaNode from '@aws-cdk/aws-lambda-nodejs';
import * as iam from '@aws-cdk/aws-iam';
import { Duration } from '@aws-cdk/core';
import * as events from '@aws-cdk/aws-events';
import * as eventsTargets from '@aws-cdk/aws-events-targets';
export class DemoStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);
// データ用のDynamoDBテーブル
const dynamodbTable = new ddb.Table(this, 'DataTable', {
partitionKey: {
name: 'id',
type: ddb.AttributeType.STRING,
},
pointInTimeRecovery: true, // 必須
});
// エクスポート用のS3バケット
const bucket = new s3.Bucket(this, 'ExportBucket', {
bucketName: 'demo-export-bucket',
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
});
// Athena用のデータベースとテーブルの設定
const database = new glue.Database(this, 'DemoDatabase', {
databaseName: 'demo-db',
});
const table = new glue.Table(this, 'DemoUserTable', {
bucket,
s3Prefix: 'AWSDynamoDB/12345678901234-12345678/data/',
database,
tableName: 'user',
columns: [
{
name: 'item',
type: glue.Schema.struct([
{
name: 'id',
type: glue.Schema.struct([{
name: 'S',
type: glue.Schema.STRING,
}]),
},
{
name: 'name',
type: glue.Schema.struct([{
name: 'S',
type: glue.Schema.STRING,
}]),
},
{
name: 'createdAt',
type: glue.Schema.struct([{
name: 'S',
type: glue.Schema.STRING,
}]),
},
])
}
],
dataFormat: glue.DataFormat.JSON,
});
// Lambda Functionの作成
const lambdaFunction = new lambdaNode.NodejsFunction(this, 'DataExportFunction', {
runtime: lambda.Runtime.NODEJS_14_X,
entry: 'lib/resources/handler.ts',
handler: 'handler',
memorySize: 128,
timeout: Duration.minutes(5),
});
lambdaFunction.addEnvironment('DYNAMO_TABLE_ARN', dynamodbTable.tableArn);
lambdaFunction.addEnvironment('EXPORT_S3_BUCKET', bucket.bucketName);
lambdaFunction.addEnvironment('GLUE_DATABASE_NAME', database.databaseName);
lambdaFunction.addEnvironment('GLUE_TABLE_NAME', table.tableName);
lambdaFunction.addToRolePolicy(new iam.PolicyStatement({
actions: ['dynamodb:ExportTableToPointInTime'],
resources: [
dynamodbTable.tableArn,
],
effect: iam.Effect.ALLOW,
}));
bucket.grantPut(lambdaFunction);
lambdaFunction.addToRolePolicy(new iam.PolicyStatement({
actions: ['glue:GetTable', 'glue:UpdateTable'],
resources: [
database.catalogArn,
database.databaseArn,
table.tableArn,
],
effect: iam.Effect.ALLOW,
}));
// スケジュール設定
const cronEvent = new events.Rule(this, 'DataExportCronEvent', {
schedule: events.Schedule.cron({
minute: '0',
hour: '*/1',
day: '*',
month: '*',
year: '*',
}),
targets: [new eventsTargets.LambdaFunction(lambdaFunction)],
});
}
}
Athenaでクエリを行う
最後に、Athenaでクエリを行ってみましょう。
例えばこんな感じで、日ごとの登録者数を見ることができます。
(日本のタイムゾーンでの日付で集計したいため、一部キャスト処理を行っています。)
SELECT
COUNT(1),
CAST(from_iso8601_timestamp(Item.createdAt.S) AT TIME ZONE 'Japan' AS DATE) as createdAt
FROM "demo-db"."user"
GROUP BY CAST(from_iso8601_timestamp(Item.createdAt.S) AT TIME ZONE 'Japan' AS DATE)
おわりに
今回は、CDKでDynamoDBのデータをAthenaで分析する環境を作ってみました。
もっといいやり方もあるかも知れませんので、ご存知の方はコメントなどいただけると嬉しいです。