この記事で紹介していること
- Lambda×Pythonで、S3のオブジェクト移動
- boto3の制約を乗り越え、1000件以上のオブジェクト移動
- Lambda×Step Functionsで実行制限(15分)を超越
この記事がQiita初投稿です。
普段はNTT東日本で、ひかりクラウドスマートスタディの開発エンジニアをしております。
この記事が有益であると思った方は是非、イイねをお願いします!
前置き
S3のアクセスログからコンテンツの利用頻度をAthenaを使って分析したいという要望を受け、
早速バケットの中身を見てみると、過去数年分の溜まりに溜まったアクセスログが一つのプレフィックスに格納されていた。。。(絶望)※ちなみにS3にはフォルダという概念はないらしい
あまりにもオブジェクト数が多すぎるため、今後は分析の下準備としてLambdaを使って日毎にアクセスログをコピー削除するバッチを作成し、分析しやすくした過程を下記で説明する。
やりたいこと
-
毎日、一箇所のS3プレフィックスに溜まっていくアクセスログを、日付毎のプレフィックスにコピーする。コピー元のログは削除すること。
- コピー元
s3://bucket_name/src_prefix
- コピー先
s3://bucket_name/dst_prefix/YYYY/MM/DD
やったこと
1. Lambda単体でオブジェクトを動かしてみる
まず愚直にLmabdaを書いていく。
目指すのは下記イメージ。
リファレンスを見ると、list_objects_v2が使えそうなので、
まずはS3のコピー元のオブジェクトをリストして、対象を絞るとこから書いてみた。
#前日の日付を取得
today = datetime.date.today()
yesterday= datetime.timedelta(days=1)
input_day= today - yesterday
year_month_day = input_day.strftime('%Y/%m/%d/')
S3 = boto3.client('s3', region_name= 'xxxxxxxxxxxx')
SNS = boto3.client('sns')
bucket_name = os.environ['Bucket_name']
src_prefix = os.environ['Src_prefix']
#コピー先の前日分のプレフィックス生成
dst_prefix = 's3/' + year_month_day
#最大コピー数
Max_Key = 1000
#コピー元にある対象オブジェクトをリスト
Src_list = S3.list_objects_v2(
Bucket=Bucket_name,
Prefix=Src_prefix + str(input_day),
MaxKeys=Max_Key
)
#リスト結果からメタ情報を取得し、Add_listに追加
for Content in Src_list['Contents']:
src_path = Content['Key']
# Keyからオブジェクト名を生成(YYYY-MM-DD-xxxxxxx)
dst_file_name=os.path.basename(src_path)
#プレフィックスとオブジェクト名を再度結合
Copy_to_path=os.path.join(
dst_prefix,
dst_file_name
)
次に作成したリストとプレフィックスを用いて、コピーと削除を実行。
try:
s3.copy_object(
Bucket=bucket_name,
Key=dst_path,
CopySource={
'Bucket': bucket_name,
'Key': src_path
}
)
# コピー失敗のログ出力
except:
logger.error(
"Copy_Error:[%s] can't copy.", str(src_path))
# srcオブジェクト削除
else:
try:
s3.delete_object(
Bucket=bucket_name,
Key=src_path,
)
#削除失敗のログ出力
except:
logger.error(
"Delete Error:[%s] can't delete.", str(src_path)
)
最後に繋げて実行したところ、コピー先にファイルがあることは確認できたものの、なぜかコピー元にもファイルが残っていた。Boto3リファレンスをよく読んでみると、
Returns some or all (up to 1,000) of the objects in a bucket with each request.
list_objects_v2は一度に1000件しかオブジェクトをリストできないため、1001件目以降が残されていた。
2. 1000件以上のオブジェクトを繰り返し動かしてみる
色々調べてみると、1000件以上のオブジェクトをリストするには、”NextContinuationToken”を使うのがいいらしく、list後にこれがある場合とない場合で条件分岐を入れ、Whileループさせるように変更してみた。
next_token = ''
while True:
if next_token == '':
src_list = S3.list_objects_v2(
Bucket=bucket_name,
Prefix=src_prefix + str(yesterday),
MaxKeys=MAX_KEY
)
else:
src_list = S3.list_objects_v2(
Bucket=bucket_name,
Prefix=src_prefix + str(yesterday),
MaxKeys=MAX_KEY,
ContinuationToken=next_token
)
# リスト結果からメタ情報を取得
for content in src_list['contents']:
src_path = content['Key']
# コピー先のprefixとオブジェクト名を生成
# 'Key'からオブジェクト名だけ抽出(Keyのままだとオブジェクトとして認識されない。
dst_file_name = os.path.basename(src_path)
# プレフィックスとオブジェクト名を再度結合
copy_to_path = os.path.join(
dst_prefix,
dst_file_name
)
# コピー実行
try:
s3.copy_object(
Bucket=bucket_name,
Key=dst_path,
CopySource={
'Bucket': bucket_name,
'Key': src_path
}
)
# コピー失敗のログ出力
except:
logger.error(
"Copy_Error:[%s] can't copy.", str(src_path))
# srcオブジェクト削除
else:
try:
s3.delete_object(
Bucket=bucket_name,
Key=src_path,
)
# 削除失敗のログ出力
except:
logger.error(
"Delete Error:[%s] can't delete.", str(src_path)
)
# List1000件以上対応
if 'NextContinuationToken' in src_list:
next_token = src_list['NextContinuationToken']
else:
print('END')
これで1000件以上のオブジェクトをリストして、コピーさせることに成功。(もちろんコピー元は削除済み)
しかし、本番環境と同じように1万件以上のオブジェクトを用意して実行したところ、Lambdaの実行制限(15分)の壁が立ち塞がり、処理がTime outした。
Task timed out
3. Lambdaの実行制限を超越するため、Step Functionsで繰り返し処理する。
1万件以上のオブジェクト操作をLambdaの実行制限に負けずに実行させるために、Step Functions を用いて、繰り返し実行させてみることにした。
スクリプトの変更点は下記の通り。(シニアエンジニアからアドバイスをもらった)
- eventで日付を引数にすることで、指定日でも実行可能とする。(コピー失敗時のリカバリ)
- Whileループの削除(StepFuctionsのChoiceステートで分岐させるため)
- 関数化・定数化できる部分をmain関数から外だし。
- エラー時はSNSを使ってSlack通知。
- バッチとして処理させるため、EventBridgeから発火させる。
import os
import datetime
from datetime import datetime as dt
import logging
import boto3
s3 = boto3.client('s3', region_name='xxxxxxxxxxxxx')
sns = boto3.client('sns')
# 環境変数の取得(バケット名,コピー元,sns送信先,ログレベル,環境名)
bucket_name = os.environ['bucket_name']
src_prefix = os.environ['src_prefix']
sns_topic_arn = os.environ['sns_topic_arn']
LOG_LEVEL = os.environ['LOG_LEVEL']
SUBJECT = os.environ['subject']
# 最大リスト件数
MAX_KEY = 1000
# dstの日毎のプレフィックスフォーマット
PREFIX_FORMAT = '%Y/%m/%d'
# ログレベルの設定
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)
def check_day(target):
# 8桁の文字列が入力された場合
if len(target) == 8:
# targetに8桁の数字が入力された場合:True
if str.isdigit(target):
return True
# targetに8桁だが、数字以外が入力された場合:False
else:
logger.error('Invalid input value:%s', target)
return False
# それ以外の場合:False
else:
logger.error('Characters not match:%s', target)
return False
def send_message(message, subject):
sns.publish(
TopicArn=sns_topic_arn,
Message=message,
Subject=subject
)
#メイン関数
def lambda_handler(event, context)
# コピー及び削除に失敗した場合の追加先リスト
error_obj = []
target = event['target']
# check_day()がFalseの場合は、 return {'result':xxxx}で終了処理
if target:
if check_day(target) is False:
send_message("Your request content is not existed:" + str(target),
"["+SUBJECT+"] REQUEST ERROR")
return {'result': 'Invalid request'}
# 手動実行の場合の入力値を変換(YYYY/MM/DD,YYYY-MM-DD)
else:
target = dt.strptime(target, '%Y%m%d')
input_day = target.strftime('%Y-%m-%d')
year_month_day = target.strftime(PREFIX_FORMAT)
logger.info('This process is by yourself.')
# EventBridgeによる実行の場合は、前日の日付を変換(YYYY/MM/DD,YYYY-MM-DD)
else:
input_day = datetime.date.today() - datetime.timedelta(days=1)
year_month_day = input_day.strftime(PREFIX_FORMAT)
logger.info('This process is daily task by EventBridge.')
logger.info('Target day is [%s]', input_day)
# コピー先のプレフィックス生成
dst_prefix = 'S3/' + year_month_day
logger.info('Target pass is [%s]', dst_prefix)
# コピー元オブジェクトをリスト(1000件まで)
src_list = s3.list_objects_v2(
Bucket=bucket_name,
Prefix=src_prefix + str(input_day),
MaxKeys=MAX_KEY
)
logger.debug('list:%s', src_list)
# src_listに対象コンテンツ(YYYY-MM-DD)がない場合
if 'Contents' not in src_list:
logger.error(
'Your request content is not existed. \n Check S3 [%s] and search target day [%s]', src_prefix, input_day)
# request errorメッセージ
send_message("Your request content is not existed\n Please Check S3:" +
src_prefix + str(input_day), "["+SUBJECT+"] REQUEST ERROR")
# return {'result':xxxx}で終了処理
return {'result': 'content is not existed'}
else:
# src_listに対象コンテンツ(YYYY-MM-DD)がある場合
# src_listをfor文で回し、メタ情報取得
for content in src_list['Contents']:
src_path = content['Key']
logger.debug('key:%s', src_path)
# Keyからオブジェクト名を生成(YYYY-MM-DD-xxxxxxx)
dst_file_name = os.path.basename(src_path)
# dstとオブジェクト名をjoinしてコピー先パスを生成
dst_path = os.path.join(
dst_prefix,
dst_file_name
)
# srcオブジェクトのコピー実行
try:
s3.copy_object(
Bucket=bucket_name,
Key=dst_path,
CopySource={
'Bucket': bucket_name,
'Key': src_path
}
)
# コピーできなかったsrcオブジェクトをリストへ追加
except:
logger.error(
"Copy_Error:[%s] can't copy.", str(src_path)
)
error_obj.append(src_path)
# srcオブジェクト削除
else:
try:
s3.delete_object(
Bucket=bucket_name,
Key=src_path,
)
# 削除できなかったsrcオブジェクトをリストへ追加
except:
logger.error(
"Delete Error:[%s] can't delete.", str(src_path)
)
error_obj.append(src_path)
# list_objects_v2でリストしきれなかった対象コンテンツ(YYYY-MM-DD)がある場合
if 'NextContinuationToken' in src_list:
token=src_list['NextContinuationToken']
logger.info('Since there is content left, processing continues.\n NextToken is [%s]',token)
return event
else:
# コピー削除に失敗したコンテンツがある場合
if error_obj:
logger.error(
'Some contents remain:%s', error_obj
)
send_message("Error Contents is \n[" + str(error_obj) +
"] \n running again", "["+SUBJECT+"] UNCOMPLICATED")
# eventをreturnし、再度ファンクション実行
return event
# コピー削除に失敗したコンテンツがない場合
else:
logger.info("Task is complete.Target day is [%s]",target)
send_message("The daily task of moving S3 access logs is complete.\nTarget is [" +
str(input_day) + "]", "["+SUBJECT+"] COMPLETE")
# Step Functionsの終了
return {'result': 'END'}
最後に
今回始めてpythonやLambdaを本格的に触ってみたしたが、下記点でつまずきました。
AWSでのつまずき
- list_object_v2の制限(1,000件まで)
- Lambdaの実行制限(15分)
Pythonでのつまづき
- 可読性の高いコードを書くこと。
- 出力結果を考えながら、コードを組み立てること
エンジニアに転身してから1ヶ月、かなり回りくどいやり方をしたと思います。
シニアエンジニアからのコードレビューをもらったりして、自分が初めてに書いていたコードから比べると、可読性も高く、エラーハンドリングがしやすくなりました。
次は先日のre:inventで発表されたStep Functions Distributed Mapを使って処理を簡略化させてみたいと思います。