LoginSignup
2
1

More than 1 year has passed since last update.

Lambdaで取得した画像をKinesis Firehose経由でS3にアップロードする

Last updated at Posted at 2021-10-03

Firehoseの動作確認のため、Lambdaから画像を流した時の個人メモ

(参考)Firehose — Boto 3 Docs 1.9.42 documentation - Amazon AWS
(参考)BufferingHints


Kinesis Firehose

  • 「Create delivery stream」を押下

000.JPG

  • Kinesisから転送はしないので「Direct PUT」とし、流す先としてS3を選択

001.JPG

  • S3バケット名を指定
    • ここで指定したバケットへのアクセスを含めたIAMロールが自動生成される

015.JPG

  • 他はデフォルトのまま、作成

004.JPG

  • 数分後、作成されたことを確認

005.JPG

Lambda

  • Python3.9にて作成

000.JPG

  • 自動生成されたロールを選択

001.JPG

  • ロールにアタッチされているポリシーを以下のように修正
    • 画像の取得元であるS3へのGetObjectを付与
      • 今回は取得したバケットの別フォルダにアップロードする
    • FirehoseへのPutRecordを付与
    • logsはデフォルトのまま
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "firehose:PutRecord",
                "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:logs:(リージョン名):(AWSアカウント):*",
                "arn:aws:s3:::(バケット名)/*",
                "arn:aws:firehose:(リージョン名):(AWSアカウント):deliverystream/(ストリーム名)"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:(リージョン名):(AWSアカウント):log-group:/aws/lambda/testingKinesisFirehose:*"
        }
    ]
}
  • ソース修正
lambda_function.py
import os
import logging
import boto3
import json

# ログ設定
logger = logging.getLogger(__name__)
logger.setLevel(os.environ['LOG_LEVEL'])

# 環境変数から取得
BACKET_NAME = os.environ['BACKET_NAME']
FILE_PATH = os.environ['FILE_PATH']
DELIVERY_STREAM = os.environ['DELIVERY_STREAM']


# S3から指定したバケット、フォルダ、ファイル名の画像を取得する
# 取得できる画像サイズはLambdaのメモリーに依存
# return:画像のバイナリデータ
def get_img_from_s3():
    logger.debug('start get_img_from_s3()')

    s3 = boto3.client('s3')
    response = s3.get_object(Bucket=BACKET_NAME, Key=FILE_PATH)
    body = response['Body'].read()

    logger.debug('end get_img_from_s3()')

    return body


# 画像をFirehoseに流す
# img:画像のバイナリデータ
# return:プットした結果
def put_to_firehose(img):
    logger.debug('start put_to_firehose()')

    firehose = boto3.client('firehose')
    response = firehose.put_record(
        DeliveryStreamName=DELIVERY_STREAM,
        Record={
            'Data': img
        }
    )

    logger.debug('end put_to_firehose()')

    return response


def lambda_handler(event, context):
    logger.debug('start lambda_handler()')

    img = get_img_from_s3()
    result = put_to_firehose(img)

    logger.debug('end lambda_handler()')

    return result

  • 環境変数設定

019.JPG

動作確認

  • 流す画像をS3に配置

020.JPG

  • 任意のテストイベントJSONを設定し、実行

021.JPG

  • Lambda実行後、5分後にS3を確認し、ファイルをダウンロード

022.JPG

023.JPG

  • 取得したファイルに拡張子をつけ、pngファイルとして開く

024.JPG

感想

  • 仕様的に流れてきたログ等をいったん溜めてS3にアップロードしている
    • S3へのアップロードが5MBか300秒に達したときになっているため
      連続して流した場合1つのファイルに2つ分の画像が含まれる
    • ファイル名がFirehoseによってリネームされる
      • ログ等をまとめてS3に流すのであれば
        一意のファイル名をつけてくれるので助かるが……

026.JPG

029.JPG

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1