今回の課題
Lambdaを使用してRDSの最新スナップショットをS3に定期的にエクスポートしたい。
SnowflakeのTASK機能を使ってデータをロードするときに新しいスナップショットだけがS3にある方が便利なので、
古いスナップショットはLambdaを実行するたびに削除するようにした。
実装手順
Lambda関数を作成する
下記のコードは、lambda_handler()
を実行すると以下のような処理が走るようになっている。
-
create_snapshot()
を呼び出してスナップショットを作成 -
create_snapshot()
と同時にcheck_snapshot_created()
も呼び出されるようになっていて、スナップショットが作成可能な状態まで待ってから、スナップショット作成の処理が走るように設定されている。 -
mysnapshot-YYYY-mm-dd-HH-MM
というS3オブジェクトが削除される - 最新の
mysnapshot-YYYY-mm-dd-HH-MM
というスナップショットがRDSからS3にロードされる。
Lambdaのコード
import json
import boto3
import time
from botocore.client import ClientError
from datetime import datetime
rds = boto3.client('rds')
s3 = boto3.resource('s3')
S3_BUCKET_NAME="s3bucket" # 出力先のS3バケット名
S3_OBJECT_NAME="s3object" #出力先のS3オブジェクト
IAM_ROLE_ARN="arn:aws:iam::************:role/s3OperationsRole" # S3にエクスポートする際に使用するロールのARN
KMS_KEY_ID="arn:aws:kms:ap-northeast-1:************:key/31b0e6b4-54a8-4b32-97c3-98e3ff4412be" # 作成したKMSキーのARN
# create_snapshot()を呼び出せば、[prefix-YYYY-mm-dd-HH-MM]というスナップショットの作成が始まるように定義
def create_snapshot(prefix, instanceid):
newsnapshotid = "-".join([prefix, datetime.now().strftime("%Y-%m-%d-%H-%M")])
rds.create_db_snapshot(
DBSnapshotIdentifier=newsnapshotid,
DBInstanceIdentifier=instanceid
)
# check_snapshot_created()を呼び出すと、RDSのStatusが「available」になるまで待機する関数
# lambda_handler()にて、create_snapshot()と同時に呼び出すことで、スナップショットを作成可能な状態まで待ってスナップショット作成の処理が走るように定義している。
def check_snapshot_created(prefix, instanceid):
snapshots = rds.describe_db_snapshots(DBInstanceIdentifier=instanceid)['DBSnapshots']
# RDSのStatusが「available」でなければ、20秒間待機して、check_snapshot_createdをもう一度呼び出すということを繰り返す。
for snapshot in snapshots:
if snapshot['Status'] != "available":
time.sleep(20)
check_snapshot_created(prefix, instanceid)
def export_snapshot(prefix, instanceid):
# rdsのスナップショットの情報を`snapshots`に入れる
snapshots = rds.describe_db_snapshots(DBInstanceIdentifier=instanceid, SnapshotType='manual')['DBSnapshots']
# snapshotが新しい順にソートして、`snapshots`に入れる
snapshots = sorted(snapshots, key=lambda x: x['SnapshotCreateTime'], reverse=True)
# 一番新しいスナップショットを`snapshot`に入れる
snapshot = snapshots[0]
newsnapshotid = "-".join([prefix, datetime.now().strftime("%Y-%m-%d-%H-%M")])
# 一番新しいスナップショットをS3にエクスポートする
response = rds.start_export_task(
ExportTaskIdentifier=newsnapshotid,
SourceArn=snapshot['DBSnapshotArn'],
S3BucketName=S3_BUCKET_NAME,
S3Prefix=S3_OBJECT_NAME,
IamRoleArn=IAM_ROLE_ARN,
KmsKeyId=KMS_KEY_ID,
)
return(response)
def lambda_handler(event, context):
bucket = S3.Bucket("s3bucket")
snapshot_prefix = 'mysnapshot'
instance = 's3object'
# スナップショットを作成
create_snapshot(snapshot_prefix, instance)
# スナップショットが作成可能な状態になるまで待機
check_snapshot_created(snapshot_prefix, instance)
# s3のオブジェクトを削除
bucket.objects.filter(Prefix=S3_OBJECT_NAME).delete()
export_snapshot(snapshot_prefix, instance)
Cloudformationのテンプレートを作成する
先ほど作成した、LambdaのコードはZipFile
の項目にペーストする。
Resources:
PracticeLambda:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: |
import json
import boto3
import time
from botocore.client import ClientError
from datetime import datetime
rds = boto3.client('rds')
s3 = boto3.resource('s3')
S3_BUCKET_NAME="s3bucket"
S3_OBJECT_NAME="s3object"
IAM_ROLE_ARN="arn:aws:iam::************:role/s3OperationsRole"
KMS_KEY_ID="arn:aws:kms:ap-northeast-1:************:key/31b0e6b4-54a8-4b32-97c3-98e3ff4412be"
def create_snapshot(prefix, instanceid):
newsnapshotid = "-".join([prefix, datetime.now().strftime("%Y-%m-%d-%H-%M")])
rds.create_db_snapshot(
DBSnapshotIdentifier=newsnapshotid,
DBInstanceIdentifier=instanceid
)
def check_snapshot_created(prefix, instanceid):
snapshots = rds.describe_db_snapshots(DBInstanceIdentifier=instanceid)['DBSnapshots']
for snapshot in snapshots:
if snapshot['Status'] != "available":
time.sleep(20)
check_snapshot_created(prefix, instanceid)
def export_snapshot(prefix, instanceid):
snapshots = rds.describe_db_snapshots(DBInstanceIdentifier=instanceid, SnapshotType='manual')['DBSnapshots']
snapshots = sorted(snapshots, key=lambda x: x['SnapshotCreateTime'], reverse=True)
snapshot = snapshots[0]
newsnapshotid = "-".join([prefix, datetime.now().strftime("%Y-%m-%d-%H-%M")])
response = rds.start_export_task(
ExportTaskIdentifier=newsnapshotid,
SourceArn=snapshot['DBSnapshotArn'],
S3BucketName=S3_BUCKET_NAME,
S3Prefix=S3_OBJECT_NAME,
IamRoleArn=IAM_ROLE_ARN,
KmsKeyId=KMS_KEY_ID,
)
return(response)
def lambda_handler(event, context):
bucket = S3.Bucket("s3bucket")
snapshot_prefix = 'mysnapshot'
instance = "s3object"
create_snapshot(snapshot_prefix, instance)
check_snapshot_created(snapshot_prefix, instance)
bucket.objects.filter(Prefix=S3_OBJECT_NAME).delete()
snapshot_prefix = 'mysnapshot'
Role: arn:aws:iam::************:role/service-role/practice-Lambda-RDStoS3-role-idais11p
Handler: index.lambda_handler
Runtime: python3.10
Timeout: 900
LambdaTrigger:
Type: AWS::Events::Rule
Properties:
Description: "EventBridge Rule for triggering Lambda function"
ScheduleExpression: cron(0 21 * * ? *)
State: ENABLED
Targets:
- Arn: !GetAtt PracticeLambda.Arn
Id : PracticeLambdaTarget
PermissionForEventRule:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref PracticeLambda
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt LambdaTrigger.Arn
AWS::Lambda::Function
について
ここでLambda関数の定義をしている。
Handlerの理解に時間がかかったため、以下にメモ。
-
Handler: index.lambda_handler
について-
Handler
という項目は、[Pythonファイル名].[Lambdaの関数名]
にする必要がある。 - Yamlのコードにpythonコードを書くと、index.pyというファイル名でLambda関数にデプロイされる。そのため、
index.lambda_handler
を記述しておく必要がある。
-
AWS::Events::Rule
について
ここでは、Lambda関数を定期実行するために必要なEventBridge
を定義している。
-
!GetAtt
について- パラメータは以下の2種類
-
logicalNameOfResource
:希望する属性を含むリソースの名前 -
attributeName
:リソースの属性の名前。(Arn
やPublicIp
等々)
-
- 上記のコードでは、
PracticeLambda
という論理名を付けたLambdaのArnを取得して、Targetsパラメータに渡している。
- パラメータは以下の2種類
-
Targets
パラメータについて- ルール(ここでは
EventsBridge
)がトリガーされたときに呼び出されるリソースを定義するための項目。
- ルール(ここでは
つまり、Lambda Trigger
という論理名の項目では、Events Bridge
のスケジュール(cron(0 21 * * ? *)
)がトリガーされたときに、
Targetsに指定しているLambda(PracticeLambda.Arn
)が呼び出される。という設定がされている。
AWS::Lambda::Permission
について
AWSサービス又は別のアカウントに関数の使用許可を付与するリソースのこと。
このリソースを使用することで、他のAWSリソースやサービスからLambda関数をトリガーする権限を付与することができる。
ここでは、EventBridge
がLambda関数をトリガーできるように権限を付与している。
-
!Ref
について- ParametersやResourceで指定したパラメータを参照するときに使う。
-
Action: InvokeFunctionに
ついて- Actionというパラメータは、Principalがファンクションに対して使用することができるアクションを定義する項目
- 設定している
InvokeFunction
は、Lambda関数の実行をトリガーするアクション
-
Principal: events.amazonaws.com
について- Lambda関数に対して、
EventBridge
からのトリガーを許可する。
- Lambda関数に対して、
-
SourceArn
について- 関数を呼び出すAWSリソースのARNを指定する。
まとめ
以上のテンプレートをAWS Cloudformationをデプロイすることで、
毎日、古いスナップショットが格納されたS3オブジェクトを削除したうえで、最新のスナップショットをエクスポートするLambda関数を実装することができた