0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Kinesis Data Streams -> Lambda -> DynamoDB 検証用コードスニペット

Posted at

Kinesis Data Streams -> Lambda -> DynamoDB 構築後に Kinesis Data Streams へのデータ投入〜DynamoDB からのデータ削除までに使ったコードのメモ。
どれも大したことやってないけど、いつか次やることがあったときにコピペでできて欲しい。

まとめ

  • Kinesis Data Streams へデータを投入する
  • Kinesis Data Streams をトリガーにしている Lambda 関数で DynamoDB にデータを投入する
  • DynamoDB からレコードを消すコード

Producerを作る

Kinesis Data Streams へデータ投入。
基本は Kinesis Data Generator を使うことを考える。CloudFormation テンプレートがありセットアップも簡単で、秒間数千以上のランダムデータを簡単に送信できる。
Amazon Kinesis Data Generatorを使用してストリーミングデータソリューションをテストする

Kinesis Data Generator は Faker.js というライブラリを使用してテストデータが作成できるが、これで作成できないデータを使いたい場合には Producer を自作する必要がある。検証用に連番が欲しかったので自作したが、非常に遅いので注意。

EC2 インスタンス準備

Amazon Linux 2 準備。PutRecord, PutRecords のみ許可したロールをアタッチ。
pip3 install boto3
以下の2つのファイルを作って実行。

producer.py
import boto3
import json
import sys
import copy

STREAM_NAME='xxxxxx'

client = boto3.client('kinesis',region_name="ap-northeast-1")

# batch_size * loop = total number of put records
# バッチサイズは最大500らしい
batch_size = 500
loop = 150

json_data = json.load(open('data.json', 'r'))

for j in range(0,loop):
    for i in range(0,batch_size):
        records = [] 
        current_fin_batch_num = batch_size * j

                # copyしないと参照渡しになる
        put_data = copy.copy(json_data)
        put_data['primaryId'] = str(current_fin_batch_num + i)

        data_elm =     {    
                'Data': json.dumps(put_data),
                'PartitionKey':str(i)
            }
        records.append(data_elm)
        # print(records)

        response = client.put_records(
            Records=records,
            StreamName=STREAM_NAME
        )
        
    print("finished "+ str( batch_size * (j+1) ))
data.json
{
    "text": "hoge"
}

コード特徴メモ

  • 4000-5000 records/min と低速
  • data.jsonを読み込み、primaryIdという連番のキーを追加してputしている
  • primaryIdstrにしているが、結局後の工程でintに変換しているので意味なさそうに見える

DynamoDB 作成

partkeyっていうプライマリーキーを作った。ここにはlambdaで生成したuuidを入れる。

Lambda を実装(DynamoDB へのデータ投入)

Kinesis Data Streams からトリガーされる Lambda 関数のコードは以下。

import base64
import json
import boto3.dynamodb.conditions
import uuid

dynamodb = boto3.resource('dynamodb')
table    = dynamodb.Table('<dynamodb-table-name>')

def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))
    for record in event['Records']:
        # Kinesis data is base64 encoded so decode here
        payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
        #print(type(payload))  ## str
        
        data = json.loads(payload)
        #print(type(data)) ## dict
        
        key = int(data["primaryId"])
        
        table.put_item(
            Item={
                "partkey":str(uuid.uuid4()),
                "key":key
            })
        
        
    return 'Successfully processed {} records.'.format(len(event['Records']))

コード特徴メモ

  • base64 デコードしている
  • uuid 作成してプライマリキーに設定してる

DynamoDB からレコードを削除するコード

別に Lambda を用意する。コンソールからテスト実行する用。
これは 親切な人が投下していたものを使った。2020年11/27に親切な人が書いてくれている。
修正箇所はTABLE_NAMEの部分と、DeleteRequestのキー指定の計2箇所。DeleteRequestの修正部分がダブルクォートや変数だとダメだったのでベタ書きした。

ID_NAME = "partkey";
// ...中略...
            DeleteRequest: {
                Key: {
                    partkey: i[ID_NAME],      // ID_NAME: i[ID_NAME] とか "partkey": i[ID_NAME] だとダメ
                },
            },

特徴

  • 1分に10000レコードちょい消せる
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?