6
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AWS lambdaを使ってS3の特定ファイルの存在を確認する

Posted at

本記事でやること

  • S3の特定バケットに指定ファイルが存在しているのかを確認する
    • もしファイルの存在が確認出来たら他のlambdaを起動させる
    • もし確認出来なければ一定の時間を空けて再度チェックする

イメージとしては、以下のようなアーキテクチャを想定しています。
また、S3の指定ファイルは日時で作成されている事を前提としています。

スクリーンショット 2019-01-13 14.27.54.png

本記事でやらないことは以下の通りなので、他の記事を参照してください。

  • Cloudwatch Eventsの設定
  • IAMロールの作成
  • lambda関数の作成の仕方

対象読者

  • AWSの各サービスの役割を一通り知っている方
  • AWS lambdaを動かしてみたい方
  • Python入門者

使用言語

  • Python 3.6.3

S3の特定バケット以下の構成

  • ファイルの存在を確認するバケットの階層は以下のようになっている事を想定しています。
    • 前日の日付のディレクトリにファイルが存在しているかをチェックしていきます。(今日が2019-01-02であれば前日の2019-01-01にtest.txtがあるのかを確認します。)
bucket
├── 2019-01-01
│   └── test.txt
├── 2019-01-02
│   └── test.txt
├── 2019-01-03
│    └── test.txt
.
.
.
  • またファイルが存在していない場合は、250秒待って再度ファイルの確認をしていきます。ここら辺は各々都合の良いように変えてください。

lambdaスクリプト

lambda_function.py
import json
import time
import logging
import boto3
from datetime import datetime, timedelta, timezone


# データ確認用のバケット
BUCKET = "test"
# 存在を確認するファイル名
FILE_NAME = "test.txt"

# ファイルが存在していた場合の次に起動させるlambda関数名
JOB_SENDER = "batch-job-sender"
# ファイルが存在していない場合の再度起動させる自分の関数名
DATA_CHECKER = "data_checker"

# ファイルが存在していない場合の待ち時間
WAITING_TIME = 250

# loggerの作成
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def lambda_handler(event, _context):
    LOGGER.info(event)

    s3_client = boto3.client("s3")
    lambda_client = boto3.client("lambda")
    
    # 今日の日付を取得する
    str_dt = get_date()
    # 前日の日付を取得する
    one_days_before = add_days(str_dt, -1)
    
    # 指定ファイルが存在しているかどうかのステータス取得
    status = check_data(s3_client, one_days_before)
    execute(event, WAITING_TIME, lambda_client, status)


def execute(event, waiting_time, lambda_client, status):

    if status:  # 対象のバケットにファイルが存在しているとき
        lambda_client.invoke(
            FunctionName=JOB_SENDER,
            InvocationType="RequestResponse",
            Payload=json.dumps(event),
            LogType="Tail"
        )

    else:  # 対象のバケットにファイルが存在していないとき
        time.sleep(waiting_time)
        lambda_client.invoke(
            FunctionName=DATA_CHECKER,
            InvocationType="Event",
            Payload=json.dumps(event),
            LogType="None"
        )


def check_data(s3_client, date):

    prefix = date
    response = s3_client.list_objects(
        Bucket=BUCKET,
        Prefix=prefix
    )

    assumed_keys = [f'{date}/{FILE_NAME}']
    try:
        keys = [content['Key'] for content in response['Contents']]
        status = set(assumed_keys).issubset(keys)
    except KeyError:
        status = False

    return status


def get_date():
    jst = timezone(timedelta(hours=+9), "JST")
    jst_now = datetime.now(jst)
    dt = datetime.strftime(jst_now, "%Y-%m-%d")

    return dt


def datetime_to_str(date: datetime) -> str:
    year = str(date.year)
    month = str("{0:02d}".format(date.month))
    day = str("{0:02d}".format(date.day))
    str_date = '{0}-{1}-{2}'.format(year, month, day)

    return str_date


def str_to_datetime(str_date: str) -> datetime:

    return datetime.strptime(str_date, '%Y-%m-%d')


def add_days(str_dt: str, days: int) -> str:
    datetime_dt = str_to_datetime(str_dt)
    n_days_after = datetime_dt + timedelta(days=days)
    str_n_days_after = datetime_to_str(n_days_after)

    return str_n_days_after

終わりに

AWS S3内に毎日何がしかのファイルを生成させている場合や前日分に生成されたファイルを使って何か処理を行う場合などにlambdaを使って確認することが出来、次の処理を行うlambdaを起動させることが出来ます。
次は、前回書いた「AWS lambdaを使ってBatchにjobを投げる」を組み合わせて、「S3に指定ファイルが存在している事を確認したら次のlambdaを起動しBatchにジョブを投げる」アーキテクチャを実現させてみたいと思います。

6
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?