0
0

More than 1 year has passed since last update.

Kinesis データストリームを DynamoDB で保存

Last updated at Posted at 2021-09-19

Kinesis データストリームを DynamoDB で保存する Lambda の環境構築です。

Kinesis データストリームの作成はこちら
AWSIOT: デバイスデータを Kinesis データストリームに送信

Lambda のプログラムはこちらです。
lambda_function.py

次の5ステップになります。
1) DynamoDB でテーブルを作成

テーブル名 table_sep19

キー 名前
ハッシュキー  deviceid String
レインジキー  timestamp String

2) IAM でロールを作成

ロール名 role_lambda_sep19

3) IAM でポリシーをロールにアタッチ

DynamoDB, Kinesis, CloudWatch にアクセスできるポリシーをアタッチ

4) Lambda の関数を作成

関数名 lambda_sep19.py

5) Lambda にトリガーを設定

Kinesis をトリガーにします。

テーブルを作成

create_table.sh
#
TABLE="table_sep19"
#
aws dynamodb create-table --table-name $TABLE \
--attribute-definitions '[{"AttributeName":"deviceid","AttributeType": "S"},{"AttributeName":"timestamp","AttributeType": "S"}]' \
--key-schema '[{"AttributeName":"deviceid","KeyType": "HASH"},{"AttributeName":"timestamp","KeyType": "RANGE"}]' \
--provisioned-throughput '{"ReadCapacityUnits": 1,"WriteCapacityUnits": 1}'
#

ロールの作成

create_role.sh
ROLL_LAMBDA="role_lambda_sep19"
#
aws iam create-role --role-name $ROLL_LAMBDA \
    --assume-role-policy-document file://lambda-role-trust.json
lambda-role-trust.json
{
  "Version":"2012-10-17",
  "Statement":[{
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
  }]
}

ポリシーをロールにアタッチ

attach_role.sh
ROLE_LAMBDA=role_lambda_sep19
#
aws iam attach-role-policy --role-name $ROLE_LAMBDA \
    --policy-arn "arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess"
#
aws iam attach-role-policy --role-name $ROLE_LAMBDA \
    --policy-arn "arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
#
aws iam attach-role-policy --role-name $ROLE_LAMBDA \
    --policy-arn "arn:aws:iam::aws:policy/AmazonKinesisFullAccess"

Lambda の関数を作成

参考プログラム
lambda_function.py

lambda_sep19.py
# -*- coding: utf-8 -*-
#
#   lambda_sep19.py
#
#                      Sep/22/2021
#
# --------------------------------------------------------------------
import sys
import os
import base64
import json
import boto3
#from decimal import *
# --------------------------------------------------------------------
def insert_main_proc(table_name,payload):
    sys.stderr.write("*** insert_main_proc *** start ***\n")
    print(table_name)
#
    deviceid = payload["DEVICE_NAME"]
    timestamp = payload["TIMESTAMP"]
    temperature = payload["TEMPERATURE"]
    humidity = payload["HUMIDITY"]
    print(temperature)
    print(humidity)
#
    dynamodb = boto3.resource('dynamodb')

    table = dynamodb.Table(table_name)

    with table.batch_writer() as batch:
        batch.put_item(
            Item={
                'deviceid': deviceid,
                'timestamp': timestamp,
                'temperature': temperature,
                'humidity': humidity
                        }
            )

    sys.stderr.write("*** insert_main_proc *** end ***\n")
#
# --------------------------------------------------------------------
def lambda_handler(event, context):
    sys.stderr.write("*** lambda_handler *** start ***\n")
#
    table_name = os.environ.get('TABLE_NAME', "default")
    print("Received event: " + json.dumps(event, indent=2))
    sys.stderr.write("*** lambda_handler *** bbb ***\n")
    data_in = event["Records"][0]["kinesis"]["data"]
    decoded = base64.b64decode(data_in)
    json_str = decoded.decode()
    print(json_str)
    payload = json.loads(json_str)
    insert_main_proc(table_name,payload)
#
    sys.stderr.write("*** lambda_handler *** end ***\n")
#
    return "OK"
# --------------------------------------------------------------------

zip で圧縮

wget_zip.sh
#
rm -f *.py *.zip
#
NAME="lambda_sep19"
#
zip -r ${NAME}.zip ${NAME}.py
#

次のファイルが出来ます。

lambda_sep19.zip

create_lambda.sh
#
NAME="lambda_sep19"
aws lambda create-function \
    --function-name ${NAME} \
    --runtime python3.9 \
    --role arn:aws:iam::123495704257:role/role_lambda_sep19 \
    --environment "Variables={TABLE_NAME=table_sep19}" \
    --handler lambda_sep19.lambda_handler \
    --zip-file fileb://${NAME}.zip \
    --region ap-northeast-1
#

トリガーを設定

create_event.sh
#
NAME="lambda_sep19"
#
aws lambda create-event-source-mapping --function-name $NAME \
--batch-size 500 --starting-position AT_TIMESTAMP \
--starting-position-timestamp 1541139109 \
--event-source-arn arn:aws:kinesis:ap-northeast-1:123495704257:stream/kinesis_sep19
#
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0