0.実装したいもの
1. ディレクトリ構成
infra/
├── bin/
├── cdk.out/
├── lambda_functions/
│ └── s3_to_dynamo.py // 実際の関数
├── lib/
│ ├── constructs/
│ │ ├── dynamo_db.ts
│ │ ├── lambda_for_s3_to_dynamodb.ts // 今回のメイン
│ │ ├── s3.ts
│ ├── stacks/
│ │ ├── infra-stack.ts
│ ├── utils.py // リムーバルポリシーの関数を定義しています(後述)
├── cdk.json
~~~省略
infra配下にcdkがinitされています。
基本的に1スタック構成でコンストラクトに切っている感じです。
lambda_functionsというディレクトリ名は
Code.fromAsset('lambda_functions')
で指定されていているもので、cdkコマンドを実行するディレクトリからの相対パスとなります。
2. スタック
infra.ts
import ~~~省略
export class InfraStack extends cdk.Stack {
constructor(
scope: Construct,
id: string,
props: cdk.StackProps & {
config: StackParameter;
},
) {
super(scope, id, props);
const {
envName,
} = props.config;
// S3バケット
const bucket = new S3Bucket(this, 'S3Bucket', { envName });
// DynamoDB
const db = new DynamoDB(this, 'DynamoDB', { envName });
const table = db.table;
// S3バケットからDynamoDBへのデータ転送用のLambda
const lambda = new S3ToDynamoDBLambda(this, 'S3ToDynamoDBLambda', { table, bucket });
3. 各コンストラクト
s3.ts
s3.ts
~~~~~省略
const bucket = new s3.Bucket(this, 'S3Bucket', {
bucketName: 'hogehoge-bucket-' + envName,
removalPolicy: createRemovalPolicy(envName),
});
bucket.grantReadWrite(loggingRole);
~~~
本旨とは関係ないですが、utils.pyにて
utils.py
import { RemovalPolicy } from "aws-cdk-lib";
import { EnvName } from "../parameter";
export function createRemovalPolicy(envName: EnvName): RemovalPolicy {
return envName === 'dev' ? RemovalPolicy.DESTROY : RemovalPolicy.RETAIN;
}
このようにしてリムーバルポリシーを定義しています。
dynamo_db.ts
dynamo_deb.ts
import { Construct } from 'constructs';
import { Table, AttributeType, BillingMode } from 'aws-cdk-lib/aws-dynamodb';
import { createRemovalPolicy } from '../utils';
import { EnvName } from '../../parameter';
interface DynamoDBProps {
envName: EnvName;
}
export class DynamoDB extends Construct {
public readonly table: Table;
constructor(scope: Construct, id: string, props: DynamoDBProps) {
super(scope, id);
const { envName } = props;
this.table = new Table(this, 'HogeTable', {
partitionKey: { name: 'id', type: AttributeType.STRING },
billingMode: BillingMode.PAY_PER_REQUEST,
removalPolicy: createRemovalPolicy(envName),
});
}
}
lambda_for_s3_to_dynamodb.ts
lambda_for_s3_to_dynamodb.ts
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as s3n from 'aws-cdk-lib/aws-s3-notifications';
import { Construct } from 'constructs';
import { Code } from 'aws-cdk-lib/aws-lambda';
interface S3ToDynamoDBLambdaProps {
bucket: s3.Bucket;
table: dynamodb.Table;
}
export class S3ToDynamoDBLambda extends Construct {
constructor(scope: Construct, id: string, props: S3ToDynamoDBLambdaProps) {
super(scope, id);
const { bucket, table } = props;
// Lambda Function
const lambdaFunction = new lambda.Function(this, 'S3ToDynamoDBFunction', {
runtime: lambda.Runtime.PYTHON_3_12,
handler: 's3_to_dynamo.handler', // s3_to_dynamo.pyのhandlerを使用することになる。
code: Code.fromAsset('lambda_functions'), //cdkを実行するディレクトリから見た相対パス。ここに実際の関数を配置する
environment: {
TABLE_NAME: table.tableName, //環境変数としてDynamoDBのテーブル名を渡す
},
});
// DynamoDBへの書き込み権限
table.grantWriteData(lambdaFunction);
// S3にオブジェクトが作成された際にLambdaをトリガー
bucket.addEventNotification( // ここがポイント!
s3.EventType.OBJECT_CREATED,
new s3n.LambdaDestination(lambdaFunction)
);
// S3バケットへの読み取り権限
bucket.grantRead(lambdaFunction);
}
}
この記事のメインの部分です。ポイントはaddEventNotificationメソッド
第一引数がイベントタイプ(s3.EventType)
第二引数が通知先(s3n.LambdaDestination)となります
ちなみにこれだけで裏側でLambdaに対して s3:InvokeFunction 権限を付与しているとのこと。
これによってLambdaに送られる通知構造は以下のようなJSON形式となっています。
以下の公式より引用
{
"Records":[
{
"eventVersion":"2.2",
"eventSource":"aws:s3",
"awsRegion":"us-west-2",
"eventTime":"The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request",
"eventName":"event-type",
"userIdentity":{
"principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
},
"requestParameters":{
"sourceIPAddress":"ip-address-where-request-came-from"
},
"responseElements":{
"x-amz-request-id":"Amazon S3 generated request ID",
"x-amz-id-2":"Amazon S3 host that processed the request"
},
"s3":{
"s3SchemaVersion":"1.0",
"configurationId":"ID found in the bucket notification configuration",
"bucket":{
"name":"amzn-s3-demo-bucket",
"ownerIdentity":{
"principalId":"Amazon-customer-ID-of-the-bucket-owner"
},
"arn":"bucket-ARN"
},
"object":{
"key":"object-key",
"size":"object-size in bytes",
"eTag":"object eTag",
"versionId":"object version if bucket is versioning-enabled, otherwise null",
"sequencer": "a string representation of a hexadecimal value used to determine event sequence, only used with PUTs and DELETEs"
}
},
"glacierEventData": {
"restoreEventData": {
"lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry",
"lifecycleRestoreStorageClass": "Source storage class for restore"
}
}
}
]
}
4. 関数部分の実装
lambda_functions/s3_to_dynamo.py
import boto3
import os
import json
dynamodb = boto3.resource('dynamodb')
table_name = os.environ.get('TABLE_NAME') # CDK側で環境変数として格納したテーブル名
# S3イベントを受け取り、その内容をDynamoDBに書き込む。
def handler(event, context):
try:
table = dynamodb.Table(table_name) # 対象のDynamoDBを特定
for record in event['Records']: # S3イベントデータから詳細情報を取得
s3_bucket_name = record['s3']['bucket']['name']
s3_object_key = record['s3']['object']['key']
item = {
'id': s3_object_key, # バケット内のオブジェクトの一意の識別子であるobject_keyをパーティションキーとする
'bucket': s3_bucket_name,
'event_time': record['eventTime']
}
table.put_item(Item=item)
return {
'statusCode': 200,
'body': json.dumps('Success')
}
except Exception as e:
print(f"Error processing S3 event: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps('Error')
}
5.挙動の確認
対象のS3にファイルをアップロードすると、、、
無事にDyanamoDBにメタデータが反映されました
なお、このS3のPUTイベント通知のタイミングについては「S3側でオブジェクトの作成が完了したタイミング」のようです。