3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS CDKAdvent Calendar 2024

Day 17

CDKにて、S3へのPUTを検知してDynamoDBにメタデータを転送するLambdaを実装する

Last updated at Posted at 2024-12-24

0.実装したいもの

image.png

1. ディレクトリ構成

infra/
├── bin/
├── cdk.out/
├── lambda_functions/
   └── s3_to_dynamo.py // 実際の関数
├── lib/
   ├── constructs/
      ├── dynamo_db.ts
      ├── lambda_for_s3_to_dynamodb.ts // 今回のメイン
      ├── s3.ts
   ├── stacks/
      ├── infra-stack.ts
   ├── utils.py    // リムーバルポリシーの関数を定義しています(後述)
├── cdk.json
~~~省略

infra配下にcdkがinitされています。
基本的に1スタック構成でコンストラクトに切っている感じです。
lambda_functionsというディレクトリ名は

Code.fromAsset('lambda_functions')

で指定されていているもので、cdkコマンドを実行するディレクトリからの相対パスとなります。

2. スタック

infra.ts
import ~~~省略

export class InfraStack extends cdk.Stack {
  constructor(
    scope: Construct,
    id: string,
    props: cdk.StackProps & {
      config: StackParameter;
    },
  ) {
    super(scope, id, props);

    const {
      envName,
    } = props.config;

    // S3バケット
    const bucket = new S3Bucket(this, 'S3Bucket', { envName });

    // DynamoDB
    const db = new DynamoDB(this, 'DynamoDB', { envName });
    const table = db.table;

    // S3バケットからDynamoDBへのデータ転送用のLambda
    const lambda = new S3ToDynamoDBLambda(this, 'S3ToDynamoDBLambda', { table, bucket });

3. 各コンストラクト

s3.ts

s3.ts
~~~~~省略

const bucket = new s3.Bucket(this, 'S3Bucket', {
      bucketName: 'hogehoge-bucket-' + envName,
      removalPolicy: createRemovalPolicy(envName),
    });
    bucket.grantReadWrite(loggingRole);

~~~

本旨とは関係ないですが、utils.pyにて

utils.py
import { RemovalPolicy } from "aws-cdk-lib";
import { EnvName } from "../parameter";

export function createRemovalPolicy(envName: EnvName): RemovalPolicy {
  return envName === 'dev' ? RemovalPolicy.DESTROY : RemovalPolicy.RETAIN;
}

このようにしてリムーバルポリシーを定義しています。

dynamo_db.ts

dynamo_deb.ts
import { Construct } from 'constructs';
import { Table, AttributeType, BillingMode } from 'aws-cdk-lib/aws-dynamodb';
import { createRemovalPolicy } from '../utils';
import { EnvName } from '../../parameter';

interface DynamoDBProps {
  envName: EnvName;
}

export class DynamoDB extends Construct {
  public readonly table: Table;

  constructor(scope: Construct, id: string, props: DynamoDBProps) {
    super(scope, id);

    const { envName } = props;

    this.table = new Table(this, 'HogeTable', {
      partitionKey: { name: 'id', type: AttributeType.STRING },
      billingMode: BillingMode.PAY_PER_REQUEST, 
      removalPolicy: createRemovalPolicy(envName),
    });
  }
}

lambda_for_s3_to_dynamodb.ts

lambda_for_s3_to_dynamodb.ts
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as s3n from 'aws-cdk-lib/aws-s3-notifications';
import { Construct } from 'constructs';
import { Code } from 'aws-cdk-lib/aws-lambda';

interface S3ToDynamoDBLambdaProps {
  bucket: s3.Bucket;
  table: dynamodb.Table;
}

export class S3ToDynamoDBLambda extends Construct {
  constructor(scope: Construct, id: string, props: S3ToDynamoDBLambdaProps) {
    super(scope, id);

    const { bucket, table } = props;

    // Lambda Function
    const lambdaFunction = new lambda.Function(this, 'S3ToDynamoDBFunction', {
      runtime: lambda.Runtime.PYTHON_3_12,
      handler: 's3_to_dynamo.handler', // s3_to_dynamo.pyのhandlerを使用することになる。
      code: Code.fromAsset('lambda_functions'), //cdkを実行するディレクトリから見た相対パス。ここに実際の関数を配置する
      environment: {
        TABLE_NAME: table.tableName, //環境変数としてDynamoDBのテーブル名を渡す
      },
    });

    // DynamoDBへの書き込み権限
    table.grantWriteData(lambdaFunction);

    // S3にオブジェクトが作成された際にLambdaをトリガー
    bucket.addEventNotification( // ここがポイント!
      s3.EventType.OBJECT_CREATED,
      new s3n.LambdaDestination(lambdaFunction)
    );

    // S3バケットへの読み取り権限
    bucket.grantRead(lambdaFunction);
  }
}


この記事のメインの部分です。ポイントはaddEventNotificationメソッド

第一引数がイベントタイプ(s3.EventType)
第二引数が通知先(s3n.LambdaDestination)となります

ちなみにこれだけで裏側でLambdaに対して s3:InvokeFunction 権限を付与しているとのこと。

これによってLambdaに送られる通知構造は以下のようなJSON形式となっています。

以下の公式より引用

{  
   "Records":[  
      {  
         "eventVersion":"2.2",
         "eventSource":"aws:s3",
         "awsRegion":"us-west-2",
         "eventTime":"The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request",
         "eventName":"event-type",
         "userIdentity":{  
            "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
         },
         "requestParameters":{  
            "sourceIPAddress":"ip-address-where-request-came-from"
         },
         "responseElements":{  
            "x-amz-request-id":"Amazon S3 generated request ID",
            "x-amz-id-2":"Amazon S3 host that processed the request"
         },
         "s3":{  
            "s3SchemaVersion":"1.0",
            "configurationId":"ID found in the bucket notification configuration",
            "bucket":{  
               "name":"amzn-s3-demo-bucket",
               "ownerIdentity":{  
                  "principalId":"Amazon-customer-ID-of-the-bucket-owner"
               },
               "arn":"bucket-ARN"
            },
            "object":{  
               "key":"object-key",
               "size":"object-size in bytes",
               "eTag":"object eTag",
               "versionId":"object version if bucket is versioning-enabled, otherwise null",
               "sequencer": "a string representation of a hexadecimal value used to determine event sequence, only used with PUTs and DELETEs"
            }
         },
         "glacierEventData": {
            "restoreEventData": {
               "lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry",
               "lifecycleRestoreStorageClass": "Source storage class for restore"
            }
         }
      }
   ]
}

4. 関数部分の実装

lambda_functions/s3_to_dynamo.py
import boto3
import os
import json

dynamodb = boto3.resource('dynamodb')
table_name = os.environ.get('TABLE_NAME') # CDK側で環境変数として格納したテーブル名

# S3イベントを受け取り、その内容をDynamoDBに書き込む。
def handler(event, context):
    try:
        table = dynamodb.Table(table_name) # 対象のDynamoDBを特定
        
        for record in event['Records']: # S3イベントデータから詳細情報を取得
            s3_bucket_name = record['s3']['bucket']['name']
            s3_object_key = record['s3']['object']['key']

            item = {
                'id': s3_object_key,  # バケット内のオブジェクトの一意の識別子であるobject_keyをパーティションキーとする
                'bucket': s3_bucket_name,
                'event_time': record['eventTime']
            }
            table.put_item(Item=item)

        return {
            'statusCode': 200,
            'body': json.dumps('Success')
        }

    except Exception as e:
        print(f"Error processing S3 event: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps('Error')
        }


5.挙動の確認

対象のS3にファイルをアップロードすると、、、

スクリーンショット 2024-12-24 9.51.25.png

無事にDyanamoDBにメタデータが反映されました

スクリーンショット 2024-12-24 9.54.13.png

なお、このS3のPUTイベント通知のタイミングについては「S3側でオブジェクトの作成が完了したタイミング」のようです。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?