Kinesis Data Streams -> Lambda -> DynamoDB 構築後に Kinesis Data Streams へのデータ投入〜DynamoDB からのデータ削除までに使ったコードのメモ。
どれも大したことやってないけど、いつか次やることがあったときにコピペでできて欲しい。
まとめ
- Kinesis Data Streams へデータを投入する
- Kinesis Data Streams をトリガーにしている Lambda 関数で DynamoDB にデータを投入する
- DynamoDB からレコードを消すコード
Producerを作る
Kinesis Data Streams へデータ投入。
基本は Kinesis Data Generator を使うことを考える。CloudFormation テンプレートがありセットアップも簡単で、秒間数千以上のランダムデータを簡単に送信できる。
Amazon Kinesis Data Generatorを使用してストリーミングデータソリューションをテストする
Kinesis Data Generator は Faker.js というライブラリを使用してテストデータが作成できるが、これで作成できないデータを使いたい場合には Producer を自作する必要がある。検証用に連番が欲しかったので自作したが、非常に遅いので注意。
EC2 インスタンス準備
Amazon Linux 2 準備。PutRecord, PutRecords のみ許可したロールをアタッチ。
pip3 install boto3
以下の2つのファイルを作って実行。
import boto3
import json
import sys
import copy
STREAM_NAME='xxxxxx'
client = boto3.client('kinesis',region_name="ap-northeast-1")
# batch_size * loop = total number of put records
# バッチサイズは最大500らしい
batch_size = 500
loop = 150
json_data = json.load(open('data.json', 'r'))
for j in range(0,loop):
for i in range(0,batch_size):
records = []
current_fin_batch_num = batch_size * j
# copyしないと参照渡しになる
put_data = copy.copy(json_data)
put_data['primaryId'] = str(current_fin_batch_num + i)
data_elm = {
'Data': json.dumps(put_data),
'PartitionKey':str(i)
}
records.append(data_elm)
# print(records)
response = client.put_records(
Records=records,
StreamName=STREAM_NAME
)
print("finished "+ str( batch_size * (j+1) ))
{
"text": "hoge"
}
コード特徴メモ
- 4000-5000 records/min と低速
-
data.json
を読み込み、primaryId
という連番のキーを追加してputしている -
primaryId
はstr
にしているが、結局後の工程でint
に変換しているので意味なさそうに見える
DynamoDB 作成
partkeyっていうプライマリーキーを作った。ここにはlambdaで生成したuuidを入れる。
Lambda を実装(DynamoDB へのデータ投入)
Kinesis Data Streams からトリガーされる Lambda 関数のコードは以下。
import base64
import json
import boto3.dynamodb.conditions
import uuid
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('<dynamodb-table-name>')
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
for record in event['Records']:
# Kinesis data is base64 encoded so decode here
payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
#print(type(payload)) ## str
data = json.loads(payload)
#print(type(data)) ## dict
key = int(data["primaryId"])
table.put_item(
Item={
"partkey":str(uuid.uuid4()),
"key":key
})
return 'Successfully processed {} records.'.format(len(event['Records']))
コード特徴メモ
- base64 デコードしている
- uuid 作成してプライマリキーに設定してる
DynamoDB からレコードを削除するコード
別に Lambda を用意する。コンソールからテスト実行する用。
これは 親切な人が投下していたものを使った。2020年11/27に親切な人が書いてくれている。
修正箇所はTABLE_NAMEの部分と、DeleteRequest
のキー指定の計2箇所。DeleteRequest
の修正部分がダブルクォートや変数だとダメだったのでベタ書きした。
ID_NAME = "partkey";
// ...中略...
DeleteRequest: {
Key: {
partkey: i[ID_NAME], // ID_NAME: i[ID_NAME] とか "partkey": i[ID_NAME] だとダメ
},
},
特徴
- 1分に10000レコードちょい消せる