LoginSignup
1
0

More than 1 year has passed since last update.

Lambdaで取得した画像をKinesis及びFirehose経由でS3にアップロードする

Last updated at Posted at 2021-10-10

Kinesisの動作確認のため、Lambdaから画像を流した時の個人メモ

(参考)Lambdaで取得した画像をKinesis Firehose経由でS3にアップロードする
(参考)Kinesis — Boto 3 Docs 1.9.96 documentation - Amazon AWS


Kinesis

  • 「データストリームの作成」を押下

000.JPG

  • シャード数は最小値として作成

001.JPG

  • 作成されたことを確認

002.JPG

Kinesis Firehose

003.JPG

  • Kinesisから転送するので「Amazon Kinesis Data Streams」とし、流す先としてS3を選択

021.JPG

  • 今回はS3 Buffer hintsを最小値に設定

005.JPG

007.JPG

動作確認

  • コンソール画面からデモデータを送信

008.JPG

  • しばらくしたら送信を止める

009.JPG

  • S3に出力されていることを確認

011.JPG

{
  "errorMessage": "An error occurred (InvalidArgumentException) when calling the PutRecord operation: This operation is not permitted on KinesisStreamAsSource delivery stream type.",
  "errorType": "InvalidArgumentException",
  "requestId": "e48441e7-7cb2-41e8-adea-23fba73b12b6",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 54, in lambda_handler\n    result = put_to_firehose(img)\n",
    "  File \"/var/task/lambda_function.py\", line 38, in put_to_firehose\n    response = firehose.put_record(\n",
    "  File \"/var/runtime/botocore/client.py\", line 386, in _api_call\n    return self._make_api_call(operation_name, kwargs)\n",
    "  File \"/var/runtime/botocore/client.py\", line 705, in _make_api_call\n    raise error_class(parsed_response, operation_name)\n"
  ]
}

Lambda

  • Python3.9にて作成

010.JPG

  • 自動生成されたロールを選択

012.JPG

  • ロールにアタッチされているポリシーを以下のように修正
    • 画像の取得元であるS3へのGetObjectを付与
      • 今回は取得したバケットの別フォルダにアップロードする
    • KinesisへのPutRecordを付与
    • logsはデフォルトのまま
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "s3:GetObject",
                "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:logs:(リージョン名):(AWSアカウント):*",
                "arn:aws:kinesis:(リージョン名):(AWSアカウント):stream/(ストリーム名)",
                "arn:aws:s3:::(バケット名)/*"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:(リージョン名):(AWSアカウント):log-group:/aws/lambda/testingKinesis:*"
        }
    ]
}
  • ソース修正
lambda_function.py
import os
import logging
import boto3
import json
import random   

# ログ設定
logger = logging.getLogger(__name__)
logger.setLevel(os.environ['LOG_LEVEL'])

# 環境変数から取得
BACKET_NAME = os.environ['BACKET_NAME']
FILE_PATH = os.environ['FILE_PATH']
STREAM_NAME = os.environ['STREAM_NAME']


# S3から指定したバケット、フォルダ、ファイル名の画像を取得する
# 取得できる画像サイズはLambdaのメモリーに依存
# return:画像のバイナリデータ
def get_img_from_s3():
    logger.debug('start get_img_from_s3()')

    s3 = boto3.client('s3')
    response = s3.get_object(Bucket=BACKET_NAME, Key=FILE_PATH)
    body = response['Body'].read()

    logger.debug('end get_img_from_s3()')

    return body


# 画像をKinesisに流す
# img:画像のバイナリデータ
# return:プットした結果
def put_to_kinesis(img):
    logger.debug('start put_to_kinesis()')

    kinesis = boto3.client('kinesis')
    response = kinesis.put_record(
        StreamName=STREAM_NAME,
        Data=img,
        PartitionKey=str(random.randint(1,4))
    )

    logger.debug('end put_to_kinesis()')

    return response


def lambda_handler(event, context):
    logger.debug('start lambda_handler()')

    img = get_img_from_s3()
    result = put_to_kinesis(img)

    logger.debug('end lambda_handler()')

    return result

  • 環境変数設定

014.JPG

動作確認

感想

  • S3 Buffer hintsを最小値にしたことで1分たたずにS3にアップロードされた
    • 厳密に1分ではなかった
  • 連続で画像を流した場合は1ファイルとしてまとめられる
    • 直接Firehoseに流した場合と同様
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0