概要
Lambda関数を使って削除を行います。
まず、条件に一致したS3オブジェクトのみを列挙し、該当するオブジェクトをS3.Client.delete_objects
メソッドで一括削除します。
コード
import boto3
import re
import os
BUCKET_NAME = 'test-s3-bucket' # 削除対象のS3バケット
PATTERN = r'^delete/target/prefix/2023080\d' # 削除対象のPrefix (正規表現)
# S3Client
s3_client = boto3.client('s3')
# 削除対象となるS3オブジェクトを列挙する
def list_objects_to_delete(bucket_name, pattern):
response = s3_client.list_objects_v2(Bucket=bucket_name)
objects_to_delete = []
for obj in response.get('Contents', []):
if re.match(pattern, obj['Key']):
objects_to_delete.append({'Key': obj['Key']})
return objects_to_delete
# objectsで与えられたS3オブジェクトを一括で削除する
def delete_objects(bucket_name, objects, should_delete):
if should_delete != 'True':
print("Delete flag is set to False. No object is deleted.")
return
if not objects:
print("No objects to delete.")
return
response = s3_client.delete_objects(Bucket=bucket_name, Delete={'Objects': objects})
if 'Deleted' in response:
print(f"{len(response['Deleted'])} objects deleted.")
if 'Errors' in response:
print(f"{len(response['Errors'])} objects couldn't be deleted due to errors.")
def lambda_handler(event, context):
# 安全装置
should_delete = event.get('delete', 'False')
# 対象となるオブジェクトをリストアップ
objects_to_delete = list_objects_to_delete(BUCKET_NAME, PATTERN)
if objects_to_delete:
print("Objects to be deleted:")
for obj in objects_to_delete:
print(obj['Key'])
delete_objects(BUCKET_NAME, objects_to_delete, should_delete)
else:
print("No objects matching the pattern to delete.")
return {
'statusCode': 200,
'body': 'Objects deletion complete.'
}
使い方
- Lambda関数を上記コードで作成し、
BUCKET_NAME
とPATTERN
に削除したいバケット名と正規表現パターンを設定する。 - TestEventを作成して、以下のJSON文をリクエストに設定する。
{ "delete": "False" }
- 作成したTestEventを実行するとデバッグログに削除対象となるオブジェクト一覧が出力されるため、削除想定のものが表示されているか確認する。
- TestEventに設定されている
delete
キーの値を"True"
に設定して、再度Lambda関数を実行する。{ "delete": "True" }
- 削除完了