動機
- 仕事でkinesisの話題になった。知ったかぶりができるようにするため、基本的な動作を実機で確認しておく。
やったこと
- streamの作成
- streamへのデータの書き込み/読み出し(AWS CLI)
- pythonスクリプトからstreamに書き込みして、streamにレコードが書き込まれたらlambdaで処理
参考にした記事
AWS CLIを使って初めてAmazon Kinesisを使ってみた
[新機能]Kinesis Streamsが時刻ベースのイテレーターに対応しました
Lambda(Python)からKinesisにPut(API)してLambda(Python)でGet(Event)する
手順(CLIでの書き込み/読み出し)
# shard 1本のstreamを作成
$ aws kinesis create-stream --stream-name mksamba-stream --shard-count 1
- 一番シンプルなshard1本でのstreamを作成。
# 1個目のデータの書き込み
$ aws kinesis put-record --stream-name mksamba-stream --partition-key 123 --data anpanman
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49593062427984882686636760320963710719404264693501526018"
}
- shard1本なのでpartition-keyの内容に関わらず必ずこのshardに入る。
# 1個目のデータの読み出し
$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name mksamba-stream
{
"ShardIterator": "AAAAAAAAAAE/WfX4ZpvcDznZT70nxmW4zUu8d3o7eqAYV9niw88UuaxVrTpm/MUC1jFrBXMyhI35oPQvKDRC+u3gY5wBYQh5jKZDF9eAdtMCHB2t136KTGcpDA7i54R5CTzOf5lo1aUqzEcmX1iBRfxAnzaBfi+XpPDRXKN52fDXJXCtPF3hg5MIcCj376QTwQvm5nwKaF+1ou1yrYWrdFnKUgTy0XoP"
}
$ aws kinesis get-records --shard-iterator AAAAAAAAAAE/WfX4ZpvcDznZT70nxmW4zUu8d3o7eqAYV9niw88UuaxVrTpm/MUC1jFrBXMyhI35oPQvKDRC+u3gY5wBYQh5jKZDF9eAdtMCHB2t136KTGcpDA7i54R5CTzOf5lo1aUqzEcmX1iBRfxAnzaBfi+XpPDRXKN52fDXJXCtPF3hg5MIcCj376QTwQvm5nwKaF+1ou1yrYWrdFnKUgTy0XoP
{
"Records": [
{
"Data": "YW5wYW5tYW4=",
"PartitionKey": "123",
"ApproximateArrivalTimestamp": 1550411747.067,
"SequenceNumber": "49593062427984882686636760320963710719404264693501526018"
}
],
"NextShardIterator": "AAAAAAAAAAElfsIV9+Qhkuy59jiyJgfSmaA7tAhra8Pitt280V+rhWch3VaqycIduEXiSZKgIIoNcpyyx9tbpb5oaWDkttJFguomKcGVtCbsMlh3b5aO8hjySCU1BOstTkoCKo+KmB3fVY8S4e+xllLP8D0c5IfTB5wmSTg2jPhku3pgCNIbhgrdmxo5NbocHCn77DySzrBlcD4Cvm9LaaLYQuZpW4vk",
"MillisBehindLatest": 0
}
$ echo YW5wYW5tYW4= | base64 --decode
anpanman
- まずイテレータを取得し、イテレータを指定してレコードを取得する。レコードはbase64 encodeされているためdecodeする。
# 2個目のデータの書き込み
$ aws kinesis put-record --stream-name mksamba-stream --partition-key 123 --data baikinman
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49593062427984882686636760321531905854623186722540748802"
}
- 同じshardに2個目のデータを書き込む。
# 2個目のデータの読み出し
$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name mksamba-stream
{
"ShardIterator": "AAAAAAAAAAFWgFqMKIxJNggikvbo7dus2kgZyyYGd8F/ZY0r453xruvlXHKlRydL05PpGDs36+wD/GuwY7A6KmU94eDwgi2bGgm3KEJ7wqnz9rN0qtmftCZ2t1O/lk1ztLWQWhzShbcQWxHuM4DLj0y56ahI0tgRdELS7ltSATdMlrqBMmdzJcDE6WUKvg4ZMi6kjtvQvGYBx90HR4EuR9je4ZSkkQdg"
}
$ aws kinesis get-records --shard-iterator AAAAAAAAAAFWgFqMKIxJNggikvbo7dus2kgZyyYGd8F/ZY0r453xruvlXHKlRydL05PpGDs36+wD/GuwY7A6KmU94eDwgi2bGgm3KEJ7wqnz9rN0qtmftCZ2t1O/lk1ztLWQWhzShbcQWxHuM4DLj0y56ahI0tgRdELS7ltSATdMlrqBMmdzJcDE6WUKvg4ZMi6kjtvQvGYBx90HR4EuR9je4ZSkkQdg
{
"Records": [
{
"Data": "YW5wYW5tYW4=",
"PartitionKey": "123",
"ApproximateArrivalTimestamp": 1550411747.067,
"SequenceNumber": "49593062427984882686636760320963710719404264693501526018"
},
{
"Data": "YmFpa2lubWFu",
"PartitionKey": "123",
"ApproximateArrivalTimestamp": 1550412420.802,
"SequenceNumber": "49593062427984882686636760321531905854623186722540748802"
}
],
"NextShardIterator": "AAAAAAAAAAGxAQrYwzNV3D84if3DuHkWj3qoZuKH7EJX0x9Chx6IHDaOe0jjrXSRGLMS614CPMuAB01Yqdat0XSqtuD3zAiD11McKb7+NrrnPJURFZMhHo9KABowzt9y6xwKId/eRqImrdlPgqDUoasCrFO9snOXYdqSeEBXp2ruEV/DYo9+FLRmF+RPSF7vS0NgV+f2KDxsmKv6MBNmzAX46E9WYQUw",
"MillisBehindLatest": 0
}
$ echo YmFpa2lubWFu | base64 --decode
baikinman
- shard-iterator-type TRIM_HORIZON にしているため、shard内にたまっているレコードは再度表示されている。
手順(Python Scriptでstreamへ書き込み)
# kinesis-put.py
import boto3
client = boto3.client('kinesis')
response = client.put_record(
Data="dokin-chan",
PartitionKey='123',
StreamName='mksamba-stream'
)
print(response)
- EC2インスタンス(amazon linux 2)にて stream への put を実行。
- python3, boto3, credential, region 等は事前に設定されている前提。
手順(lambdaでstreamから読み出し)
# kinesis-lambda.py
import logging
import base64
def lambda_handler(event, context):
logger = logging.getLogger()
logger.setLevel(logging.INFO)
for record in event['Records']:
#Kinesis data is base64 encoded so decode here
payload=base64.b64decode(record["kinesis"]["data"])
print("Decoded payload: " + str(payload))
- AWSのサンプルコード通りにlambdaを作成する。
- data source としてkinesisを設定。
- stream に書き込みがあると、lambda が起動して読み出しが行われる。(この例では読み出してcloudwatch logsに保存するのみ)
所感
- シャードとかの超基本概念は理解できたが、イテレータの指定の方法(TRIM_HORIZON)の使い方等、やっていてよくよくわからないところもあり引き続き触っていきたい。