Kinesis データストリームを DynamoDB で保存する Lambda の環境構築です。
Kinesis データストリームの作成はこちら
AWSIOT: デバイスデータを Kinesis データストリームに送信
Lambda のプログラムはこちらです。
lambda_function.py
次の5ステップになります。
- DynamoDB でテーブルを作成
テーブル名 table_sep19
キー 名前 型 ハッシュキー deviceid String レインジキー timestamp String
- IAM でロールを作成
ロール名 role_lambda_sep19
- IAM でポリシーをロールにアタッチ
DynamoDB, Kinesis, CloudWatch にアクセスできるポリシーをアタッチ
- Lambda の関数を作成
関数名 lambda_sep19.py
- Lambda にトリガーを設定
Kinesis をトリガーにします。
##テーブルを作成##
create_table.sh
#
TABLE="table_sep19"
#
aws dynamodb create-table --table-name $TABLE \
--attribute-definitions '[{"AttributeName":"deviceid","AttributeType": "S"},{"AttributeName":"timestamp","AttributeType": "S"}]' \
--key-schema '[{"AttributeName":"deviceid","KeyType": "HASH"},{"AttributeName":"timestamp","KeyType": "RANGE"}]' \
--provisioned-throughput '{"ReadCapacityUnits": 1,"WriteCapacityUnits": 1}'
#
##ロールの作成##
create_role.sh
ROLL_LAMBDA="role_lambda_sep19"
#
aws iam create-role --role-name $ROLL_LAMBDA \
--assume-role-policy-document file://lambda-role-trust.json
lambda-role-trust.json
{
"Version":"2012-10-17",
"Statement":[{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}
##ポリシーをロールにアタッチ##
attach_role.sh
ROLE_LAMBDA=role_lambda_sep19
#
aws iam attach-role-policy --role-name $ROLE_LAMBDA \
--policy-arn "arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess"
#
aws iam attach-role-policy --role-name $ROLE_LAMBDA \
--policy-arn "arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
#
aws iam attach-role-policy --role-name $ROLE_LAMBDA \
--policy-arn "arn:aws:iam::aws:policy/AmazonKinesisFullAccess"
##Lambda の関数を作成##
参考プログラム
lambda_function.py
lambda_sep19.py
# -*- coding: utf-8 -*-
#
# lambda_sep19.py
#
# Sep/22/2021
#
# --------------------------------------------------------------------
import sys
import os
import base64
import json
import boto3
#from decimal import *
# --------------------------------------------------------------------
def insert_main_proc(table_name,payload):
sys.stderr.write("*** insert_main_proc *** start ***\n")
print(table_name)
#
deviceid = payload["DEVICE_NAME"]
timestamp = payload["TIMESTAMP"]
temperature = payload["TEMPERATURE"]
humidity = payload["HUMIDITY"]
print(temperature)
print(humidity)
#
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(table_name)
with table.batch_writer() as batch:
batch.put_item(
Item={
'deviceid': deviceid,
'timestamp': timestamp,
'temperature': temperature,
'humidity': humidity
}
)
sys.stderr.write("*** insert_main_proc *** end ***\n")
#
# --------------------------------------------------------------------
def lambda_handler(event, context):
sys.stderr.write("*** lambda_handler *** start ***\n")
#
table_name = os.environ.get('TABLE_NAME', "default")
print("Received event: " + json.dumps(event, indent=2))
sys.stderr.write("*** lambda_handler *** bbb ***\n")
data_in = event["Records"][0]["kinesis"]["data"]
decoded = base64.b64decode(data_in)
json_str = decoded.decode()
print(json_str)
payload = json.loads(json_str)
insert_main_proc(table_name,payload)
#
sys.stderr.write("*** lambda_handler *** end ***\n")
#
return "OK"
# --------------------------------------------------------------------
zip で圧縮
wget_zip.sh
#
rm -f *.py *.zip
#
NAME="lambda_sep19"
#
zip -r ${NAME}.zip ${NAME}.py
#
次のファイルが出来ます。
lambda_sep19.zip
create_lambda.sh
#
NAME="lambda_sep19"
aws lambda create-function \
--function-name ${NAME} \
--runtime python3.9 \
--role arn:aws:iam::123495704257:role/role_lambda_sep19 \
--environment "Variables={TABLE_NAME=table_sep19}" \
--handler lambda_sep19.lambda_handler \
--zip-file fileb://${NAME}.zip \
--region ap-northeast-1
#
##トリガーを設定##
create_event.sh
#
NAME="lambda_sep19"
#
aws lambda create-event-source-mapping --function-name $NAME \
--batch-size 500 --starting-position AT_TIMESTAMP \
--starting-position-timestamp 1541139109 \
--event-source-arn arn:aws:kinesis:ap-northeast-1:123495704257:stream/kinesis_sep19
#