はじめに
今回はLambdaを使ってDynamoDBの既存データに新しい属性を追加していきたいと思います。
これにより、DyanmoDBの既存データに対して一括で属性を追加できるようになります。
目次
・構成について
・DynamoDBの設定項目
・DynamoDBのCloudFormation作成
・Lambdaの設定項目
・LambdaのCloudFormation作成
・動作確認
・最後に
・参考
構成について
DyanmoDBとLambda、そしてS3で構成します。
DyanmoDBの既存データを一度CSVでダウンロードし、それを元にLambdaは新しい属性を追加するようにします。
AWS環境はCloudFormationで作成します。
S3の構築は以前の記事を参考に作成してください。
DyanmoDBの設定項目
DyanmoDBの設定項目はIDをパーティションキーとして、Timeをソートキーとしています。
それ以外の項目はCloudFormationを見て、確認してください。
DyanmoDBのCloudFormation作成
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
DynamoDB deployment
#------------------------------------------------------------#
#Parameters
#------------------------------------------------------------#
Parameters:
PJPrefix:
Description: Project Name
Type: String
Default: pj-ogataro
AllowedPattern: ^[a-zA-Z0-9\-]*$
Environment:
Description: Environment Name
Type: String
Default: dev
AllowedPattern: ^[a-zA-Z0-9\-]*$
#------------------------------------------------------------#
#Resources
#------------------------------------------------------------#
Resources:
DynamoDB:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub '${PJPrefix}-${Environment}-DynamoDB'
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: true
SSESpecification:
SSEEnabled: true
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: "ID"
AttributeType: "S"
- AttributeName: "Time"
AttributeType: "S"
KeySchema:
- AttributeName: "ID"
KeyType: "HASH"
- AttributeName: "Time"
KeyType: "RANGE"
Lambdaの設定項目
今回はnodejs.20.xを利用し、コードについてはS3から取得してくるようにします。
またnodejsのパッケージをLambdaレイヤーに格納し、Lambda Functionと紐づけます。
index.jsの内容は記載しておりますが、node_modulesなどは別途対応をお願いします。
index.jsで使用する環境変数についても設定しています。
上記以外の項目についてはCloudFormationの設定を見てご確認ください
LambdaのCloudFormation作成
LambdaとIAMロールを作成します
Lambda_CloudFormation
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
Lambda DynamodDB-Table-Update release202404
#------------------------------------------------------------#
#Parameters
#------------------------------------------------------------#
Parameters:
PJPrefix:
Description: Project Name
Type: String
Default: pj-ogataro
AllowedPattern: ^[a-zA-Z0-9\-]*$
Environment:
Description: Environment Name
Type: String
Default: dev
AllowedPattern: ^[a-zA-Z0-9\-]*$
#------------------------------------------------------------#
#Resources
#------------------------------------------------------------#
Resources:
LambdaLayer:
Type: 'AWS::Lambda::LayerVersion'
Properties:
LayerName: !Sub '${PJPrefix}-${Environment}-Nodejs-Layer'
CompatibleRuntimes:
- nodejs20.x
Content:
S3Bucket: !ImportValue
Fn::Sub: '${PJPrefix}-${Environment}-storage-Output'
S3Key: nodejs.zip
###Lambda
Function:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${PJPrefix}-${Environment}-function'
Runtime: nodejs20.x
Handler: index.handler
Layers:
- !Ref LambdaLayer
Code:
S3Bucket: !ImportValue
Fn::Sub: '${PJPrefix}-${Environment}-storage-Output'
S3Key: index.zip
Role: !GetAtt IAMRole.Arn
Timeout: 20
Environment:
Variables:
TABLE: !Sub ${PJPrefix}-${Environment}-DynamoDB
BUCKET: !Sub ${PJPrefix}-${Environment}-storage
CSV: !Sub ${PJPrefix}-${Environment}.csv
REGION: "ap-northeast-1"
###IAMロール
IAMRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub '${PJPrefix}-${Environment}-dynamodb-Role'
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: LambdaExecutionPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: 'log'
Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: arn:aws:logs:*:*:*
- Sid: 'DynamoDBWrite'
Effect: Allow
Action:
- dynamodb:PutItem
- dynamodb:GetItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
- dynamodb:Query
- dynamodb:Scan
- dynamodb:BatchWriteItem
- dynamodb:BatchGetItem
Resource: !Sub 'arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${PJPrefix}-${Environment}-DynamoDB'
- Sid: 'S3Get'
Effect: Allow
Action:
- s3:GetObject
Resource: !Sub 'arn:aws:s3:::${PJPrefix}-${Environment}-storage/*'
Lambda_index.js
const { DynamoDBClient } = require('@aws-sdk/client-dynamodb');
const {
DynamoDBDocumentClient,
UpdateCommand,
} = require('@aws-sdk/lib-dynamodb');
const { S3Client, GetObjectCommand } = require('@aws-sdk/client-s3');
const { parse } = require('csv-parse/sync');
const dynamoDbclient = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(dynamoDbclient);
exports.handler = async (event, context) => {
console.log(event);
try {
// 環境変数から値を取得
const bucketName = process.env.BUCKET;
const csvName = process.env.CSV;
const tableName = process.env.TABLE;
const region = process.env.REGION;
// 環境変数の値が正しく取得できているかを確認
console.log('Bucket Name:', bucketName);
console.log('CSV Name:', csvName);
console.log('Table Name:', tableName);
console.log('Region:', region);
// 環境変数の値が存在しない場合のエラーチェック
if (!bucketName || !csvName || !tableName || !region) {
throw new Error('環境変数 BUCKET, CSV, TABLE, REGION のいずれかが設定されていません');
}
// S3クライアントをリージョン指定で初期化
const s3Client = new S3Client({ region });
// S3にアップロードしたCSVを取得する
const s3Params = {
Bucket: bucketName,
Key: csvName,
};
const data = await s3Client.send(new GetObjectCommand(s3Params));
// CSVデータを文字列に変換
const csvDataString = await streamToString(data.Body);
console.log('csvData: ' + csvDataString);
// CSVの中身をパース
const records = parse(csvDataString, {
bom: true,
columns: true,
trim: true,
});
for (let record of records) {
// レコードの内容をログ出力
console.log('Record:', record);
const dynamoParams = new UpdateCommand({
TableName: tableName,
Key: {
ID: record['ID'],
Time: record['Time'],
},
ExpressionAttributeValues: {
':Flag': 1,
},
UpdateExpression: 'set Flag = :Flag',
});
// DynamoDBのデータを更新
await docClient.send(dynamoParams);
}
return {
statusCode: 200,
headers: {
'Access-Control-Allow-Origin': '*',
},
body: 'ok',
};
} catch (error) {
console.log(error);
return {
statusCode: 404,
headers: {
'Access-Control-Allow-Origin': '*',
},
body: error.message,
};
}
};
async function streamToString(stream) {
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
return Buffer.concat(chunks).toString('utf-8');
}
動作確認
正常に構築ができたら動作確認をしていきます。
1.DynamoDBデータ作成&CSVダウンロード
作成したDyanmoDBの項目に適当に値を入力し、データを作成します。
作成が完了したら、アクションから結果をCSVダウンロードします。
ダウンロードしたファイルは、Lambdaで設定している環境変数と合わせます。
2.CSVアップロード
そして、CSVファイルを作成したS3にアップロードします。
3.Lambdaテスト実行
アップロードが完了したら、作成したLambdaにてテスト実行をします。
テストで下記のようなログが表示されたら成功です。
4.追加確認
DyanmoDBに無事Flag属性が追加されていることを確認します。
最後に
こちらでLambdaを使ってDynamoDBの既存データに新しい属性を追加する手順は完了です。
参考になれば、幸いです。
参考サイト
Lambda
DynamoDB