Kinesisの動作確認のため、Lambdaから画像を流した時の個人メモ
(参考)Lambdaで取得した画像をKinesis Firehose経由でS3にアップロードする
(参考)Kinesis — Boto 3 Docs 1.9.96 documentation - Amazon AWS
Kinesis
- 「データストリームの作成」を押下
- シャード数は最小値として作成
- 作成されたことを確認
Kinesis Firehose
- 「配信ストリームを使用した処理」を押下し、Firehose作成画面に遷移する
-
Lambdaで取得した画像をKinesis Firehose経由でS3にアップロードするにて
作成したFirehoseはインプット元として「Direct PUT」を選択したため、流用できない
-
Lambdaで取得した画像をKinesis Firehose経由でS3にアップロードするにて
- Kinesisから転送するので「Amazon Kinesis Data Streams」とし、流す先としてS3を選択
- 今回はS3 Buffer hintsを最小値に設定
-
作成されたことを確認
動作確認
- コンソール画面からデモデータを送信
- しばらくしたら送信を止める
- S3に出力されていることを確認
-
Lambdaで取得した画像をKinesis Firehose経由でS3にアップロードするのようにLambdaからPutRecordするとエラーとなる
- Firehose作成時に「Direct PUT」を選択していないから?
{
"errorMessage": "An error occurred (InvalidArgumentException) when calling the PutRecord operation: This operation is not permitted on KinesisStreamAsSource delivery stream type.",
"errorType": "InvalidArgumentException",
"requestId": "e48441e7-7cb2-41e8-adea-23fba73b12b6",
"stackTrace": [
" File \"/var/task/lambda_function.py\", line 54, in lambda_handler\n result = put_to_firehose(img)\n",
" File \"/var/task/lambda_function.py\", line 38, in put_to_firehose\n response = firehose.put_record(\n",
" File \"/var/runtime/botocore/client.py\", line 386, in _api_call\n return self._make_api_call(operation_name, kwargs)\n",
" File \"/var/runtime/botocore/client.py\", line 705, in _make_api_call\n raise error_class(parsed_response, operation_name)\n"
]
}
Lambda
- Python3.9にて作成
- 自動生成されたロールを選択
- ロールにアタッチされているポリシーを以下のように修正
- 画像の取得元であるS3へのGetObjectを付与
- 今回は取得したバケットの別フォルダにアップロードする
- KinesisへのPutRecordを付与
- logsはデフォルトのまま
- 画像の取得元であるS3へのGetObjectを付与
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"s3:GetObject",
"logs:CreateLogGroup"
],
"Resource": [
"arn:aws:logs:(リージョン名):(AWSアカウント):*",
"arn:aws:kinesis:(リージョン名):(AWSアカウント):stream/(ストリーム名)",
"arn:aws:s3:::(バケット名)/*"
]
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:(リージョン名):(AWSアカウント):log-group:/aws/lambda/testingKinesis:*"
}
]
}
- ソース修正
lambda_function.py
import os
import logging
import boto3
import json
import random
# ログ設定
logger = logging.getLogger(__name__)
logger.setLevel(os.environ['LOG_LEVEL'])
# 環境変数から取得
BACKET_NAME = os.environ['BACKET_NAME']
FILE_PATH = os.environ['FILE_PATH']
STREAM_NAME = os.environ['STREAM_NAME']
# S3から指定したバケット、フォルダ、ファイル名の画像を取得する
# 取得できる画像サイズはLambdaのメモリーに依存
# return:画像のバイナリデータ
def get_img_from_s3():
logger.debug('start get_img_from_s3()')
s3 = boto3.client('s3')
response = s3.get_object(Bucket=BACKET_NAME, Key=FILE_PATH)
body = response['Body'].read()
logger.debug('end get_img_from_s3()')
return body
# 画像をKinesisに流す
# img:画像のバイナリデータ
# return:プットした結果
def put_to_kinesis(img):
logger.debug('start put_to_kinesis()')
kinesis = boto3.client('kinesis')
response = kinesis.put_record(
StreamName=STREAM_NAME,
Data=img,
PartitionKey=str(random.randint(1,4))
)
logger.debug('end put_to_kinesis()')
return response
def lambda_handler(event, context):
logger.debug('start lambda_handler()')
img = get_img_from_s3()
result = put_to_kinesis(img)
logger.debug('end lambda_handler()')
return result
- 環境変数設定
動作確認
感想
- S3 Buffer hintsを最小値にしたことで1分たたずにS3にアップロードされた
- 厳密に1分ではなかった
- 連続で画像を流した場合は1ファイルとしてまとめられる
- 直接Firehoseに流した場合と同様