本記事でやること
- S3の特定バケットに指定ファイルが存在しているのかを確認する
- もしファイルの存在が確認出来たら他のlambdaを起動させる
- もし確認出来なければ一定の時間を空けて再度チェックする
イメージとしては、以下のようなアーキテクチャを想定しています。
また、S3の指定ファイルは日時で作成されている事を前提としています。
本記事でやらないことは以下の通りなので、他の記事を参照してください。
- Cloudwatch Eventsの設定
- IAMロールの作成
- lambda関数の作成の仕方
対象読者
- AWSの各サービスの役割を一通り知っている方
- AWS lambdaを動かしてみたい方
- Python入門者
使用言語
- Python 3.6.3
S3の特定バケット以下の構成
- ファイルの存在を確認するバケットの階層は以下のようになっている事を想定しています。
- 前日の日付のディレクトリにファイルが存在しているかをチェックしていきます。(今日が2019-01-02であれば前日の2019-01-01にtest.txtがあるのかを確認します。)
bucket
├── 2019-01-01
│ └── test.txt
├── 2019-01-02
│ └── test.txt
├── 2019-01-03
│ └── test.txt
.
.
.
- またファイルが存在していない場合は、250秒待って再度ファイルの確認をしていきます。ここら辺は各々都合の良いように変えてください。
lambdaスクリプト
lambda_function.py
import json
import time
import logging
import boto3
from datetime import datetime, timedelta, timezone
# データ確認用のバケット
BUCKET = "test"
# 存在を確認するファイル名
FILE_NAME = "test.txt"
# ファイルが存在していた場合の次に起動させるlambda関数名
JOB_SENDER = "batch-job-sender"
# ファイルが存在していない場合の再度起動させる自分の関数名
DATA_CHECKER = "data_checker"
# ファイルが存在していない場合の待ち時間
WAITING_TIME = 250
# loggerの作成
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
def lambda_handler(event, _context):
LOGGER.info(event)
s3_client = boto3.client("s3")
lambda_client = boto3.client("lambda")
# 今日の日付を取得する
str_dt = get_date()
# 前日の日付を取得する
one_days_before = add_days(str_dt, -1)
# 指定ファイルが存在しているかどうかのステータス取得
status = check_data(s3_client, one_days_before)
execute(event, WAITING_TIME, lambda_client, status)
def execute(event, waiting_time, lambda_client, status):
if status: # 対象のバケットにファイルが存在しているとき
lambda_client.invoke(
FunctionName=JOB_SENDER,
InvocationType="RequestResponse",
Payload=json.dumps(event),
LogType="Tail"
)
else: # 対象のバケットにファイルが存在していないとき
time.sleep(waiting_time)
lambda_client.invoke(
FunctionName=DATA_CHECKER,
InvocationType="Event",
Payload=json.dumps(event),
LogType="None"
)
def check_data(s3_client, date):
prefix = date
response = s3_client.list_objects(
Bucket=BUCKET,
Prefix=prefix
)
assumed_keys = [f'{date}/{FILE_NAME}']
try:
keys = [content['Key'] for content in response['Contents']]
status = set(assumed_keys).issubset(keys)
except KeyError:
status = False
return status
def get_date():
jst = timezone(timedelta(hours=+9), "JST")
jst_now = datetime.now(jst)
dt = datetime.strftime(jst_now, "%Y-%m-%d")
return dt
def datetime_to_str(date: datetime) -> str:
year = str(date.year)
month = str("{0:02d}".format(date.month))
day = str("{0:02d}".format(date.day))
str_date = '{0}-{1}-{2}'.format(year, month, day)
return str_date
def str_to_datetime(str_date: str) -> datetime:
return datetime.strptime(str_date, '%Y-%m-%d')
def add_days(str_dt: str, days: int) -> str:
datetime_dt = str_to_datetime(str_dt)
n_days_after = datetime_dt + timedelta(days=days)
str_n_days_after = datetime_to_str(n_days_after)
return str_n_days_after
終わりに
AWS S3内に毎日何がしかのファイルを生成させている場合や前日分に生成されたファイルを使って何か処理を行う場合などにlambdaを使って確認することが出来、次の処理を行うlambdaを起動させることが出来ます。
次は、前回書いた「AWS lambdaを使ってBatchにjobを投げる」を組み合わせて、「S3に指定ファイルが存在している事を確認したら次のlambdaを起動しBatchにジョブを投げる」アーキテクチャを実現させてみたいと思います。