はじめに
この記事は、前回の続きです。
IoT Core の準備が出来たので、実際に Kinesis Data Streams と Lambda タンブリングウィンドウを連携してみましょう。
システム構成
今回の作業範囲を明記した、システム構成図を載せます。画像右側のデータ集計の本体部分です。
Lambda タンブリングウィンドウがデータをどのように集計するのか
今回の構成では、30秒おきに Lambda タンブリングウィンドウをつかってデータ集計をしていきます。30秒の間に複数の Lambda Function がリレーのバトンのように state
データを受け渡していきます。state
データの受け渡しイメージ図を次に乗せます。わかりやすく書いているので、細かい部分は説明不足な点がありますがご了承ください。
タンブリングウィンドウの一定周期の枠の中で、最初に呼び出される関数が空の state
を受け取ります。state
は、Lambda Handler の event の中に含まれています。2回目以降の関数は state
は何かしらのデータが入っているので、空の場合は最初の呼び出しだと識別できます。
def lambda_handler(event, context):
state = event['state']
Lambda 関数は、以下の例のように、次の Lambda 関数へ state
を受け渡しできます。そのため、Kinesis Data Streams から受け取ったデータを、Lambda 内で集計加工して state
として受け渡すことが出来ます。
return {
"statusCode": 200,
"state": state # 次に呼びだされる Lambda Function に引き継がれる
}
すると、2回目に呼び出された Lambda 関数では、最初の関数の state
をそのまま受け取れます。2回目以降の Lambda 関数でも集計加工を行い受け渡し続けることでことで、最後の呼び出し Lambda 関数でリアルタイム集計を行った結果を利用できます。
最後の Lambda 関数かどうか識別するために、Lambda の event に含まれている変数が活用できます。event に含まれる isFinalInvokeForWindow
か isWindowTerminatedEarly
が True の場合は、その Lambda 関数が、タンブリングウィンドウの中で最後の呼び出しになります。
isFinalInvokeForWindow
が True の場合は、ウィンドウ時間枠を超えた場合に起きます。isWindowTerminatedEarly
が True の場合は、時間範囲内でも、state
の内容が 1MB を超えたときに発生します。state
の上限が 1MB ということですね。
URL : https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-kinesis.html
ここまで概念的な学習をしたので、これから実際に手を動かす内容に移っていきます。
Kinesis Data Streams の設定
IoT Core から連携する、Kinesis Data Streams を作成していきます。Create data stream を押します。
今回は、テスト用途となり性能ピークなどはないので、Provisioned で構成していきます。(名前は kinesis data analytics と書いていますが、Lambda 連携を行っていきます。)
Create data stream を押します
作成されました。
IoT Core Rule を設定
IoT Core の MQTT Topic に送付された JSON データを、Kinesis Data Streams に出力するために、IoT Core Rule を設定します。
Rule の名前や条件を指定します。
次のように FROM に MQTT Topic 名を指定することで、この Topic のデータを全て Kinesis Data Streams に流すことが出来ます。
SELECT * FROM 'data/kinesis-test-device'
Add action で Kinesis との連携設定を入れていきます。
Kinesis Data Streams
- 対象の Data Stream の名前を指定
- Paritition Key は、なんでもよいので newuuid を指定
- IAM Role を指定
Create Rule を押します
作成されたので Enable
Enable になりました
Lambda 関数の作成
Lambda 関数を作成していきます。この記事の環境では SAM で作成しました。マネージメントコンソールからポチポチと作成しても良いとおもいます。
プログラムの全体を載せます。
from datetime import date
import json
from logging import getLogger, INFO
import base64
import decimal
import boto3
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key
dynamodb = boto3.resource('dynamodb')
dynamodb_table = dynamodb.Table('lambda_tumbling_window')
logger = getLogger(__name__)
logger.setLevel(INFO)
def lambda_handler(event, context):
print("============ logger.info の出力 ============")
logger.info(json.dumps(event))
# Window 内の最終呼び出し
if event['isFinalInvokeForWindow']:
print('Destination invoke')
store_state(event)
return {
"statusCode": 200,
"body": json.dumps({
"message": "isFinalInvokeForWindow is True",
})
}
else:
print('Aggregate invoke')
# Window の早期終了
if event['isWindowTerminatedEarly']:
print('Window terminated early')
store_state(event)
state = event['state']
for record in event['Records']:
decodedata = base64.b64decode(record["kinesis"]["data"])
decodejson = json.loads(decodedata, parse_float=decimal.Decimal)
# print(decodedata)
# print(decodejson['ROOM'])
roomid = str(decodejson['ROOM'])
# state に存在しない場合は初期化
if roomid not in state:
state[roomid] = {"PEOPLE_SUM": 0,
"RECORD_COUNT": 0, "PEOPLE_AVG": 0}
state[roomid]["PEOPLE_SUM"] += decodejson['PEOPLE']
state[roomid]["RECORD_COUNT"] += 1
state[roomid]["PEOPLE_AVG"] = state[roomid]["PEOPLE_SUM"] / \
state[roomid]["RECORD_COUNT"]
return {
"statusCode": 200,
"body": json.dumps({
"message": "hello tumbling window",
}),
"state": state # state の返却。次に呼びだされる Lambda Function に引き継がれる
}
def store_state(event):
state = event['state']
date_start = event['window']['start']
date_end = event['window']['end']
kinesis_shardid = event['shardId']
for roomid, value in state.items():
item = {"roomid": roomid,
"date_start": date_start,
"date_end": date_end,
"people_avg": float_to_decimal(value["PEOPLE_AVG"]),
"kinesis_shardid": kinesis_shardid}
dynamodb_table.put_item(Item=item)
def float_to_decimal(obj):
if isinstance(obj, float):
# note: str casting is important for dynamodb
return decimal.Decimal(str(obj))
return obj
いくつかポイントをピックアップします。
次の部分で、Lambda 関数の呼び出しが最後かどうか判定しています。最後の場合は、DynamoDB にデータを吐き出す処理 store_state(event)
を行っています。
# Window 内の最終呼び出し
if event['isFinalInvokeForWindow']:
print('Destination invoke')
store_state(event)
return {
"statusCode": 200,
"body": json.dumps({
"message": "isFinalInvokeForWindow is True",
})
}
else:
print('Aggregate invoke')
# Window の早期終了
if event['isWindowTerminatedEarly']:
print('Window terminated early')
store_state(event)
この部分は、state
を event から取り出して、初期化や集計処理を行っている部分です。
state = event['state']
for record in event['Records']:
decodedata = base64.b64decode(record["kinesis"]["data"])
decodejson = json.loads(decodedata, parse_float=decimal.Decimal)
# print(decodedata)
# print(decodejson['ROOM'])
roomid = str(decodejson['ROOM'])
# state に存在しない場合は初期化
if roomid not in state:
state[roomid] = {"PEOPLE_SUM": 0,
"RECORD_COUNT": 0, "PEOPLE_AVG": 0}
state[roomid]["PEOPLE_SUM"] += decodejson['PEOPLE']
state[roomid]["RECORD_COUNT"] += 1
state[roomid]["PEOPLE_AVG"] = state[roomid]["PEOPLE_SUM"] / \
state[roomid]["RECORD_COUNT"]
Kinesis Data Streams からは、次のように ROOM ごとの滞在人数の JSON データがストリーミングで送られてくるため、滞在人数の平均をリアルタイムで集計をしていっています。
{
"ROOM": 7,
"EVENTTIME": "2022-04-10T12:56:09",
"PEOPLE": 32
}
次の関数に state
を引き継ぐ部分はこちらです。return で返却すると、次に受け渡してくれます。便利ですね。
return {
"statusCode": 200,
"body": json.dumps({
"message": "hello tumbling window",
}),
"state": state # state の返却。次に呼びだされる Lambda Function に引き継がれる
}
Lambda タンブリングウィンドウの設定
Lambda 関数が作成できたら、Kinesis Data Streams との連携を行います。Add trigger を押します。
Kinesis Data Streams を選択して、Batch size などを選択します。
タンブリングウィンドウの設定個所はここです。地味なので見逃さないようにしましょう。 この設定値は、30 秒ごとの周期で集計処理を行う設定になります。最大 15分まで指定可能です。
add を押します。
追加されました。
DynamoDB Table の作成
Lambda が最終的に書き込む DymamoDB Table を用意します。Create table を押します。
次の値を入れます。
lambda_tumbling_window
roomid
data_start
Create Table
作成されました
動作確認
疑似 IoT デバイスとして用意した、Amazon Linux2 で以下のプログラムを動かします。
cd ~/temp/kinesis-data-analytics
python3 device_main.py --device_name kinesis-test-device --endpoint aa49ciipg36pc-ats.iot.ap-northeast-1.amazonaws.com
1秒おきに、IoT Core にデータが送られ、Kinesis 経由で Lambda が起動します。
Lambda が正常に稼働して、DynamoDB にリアルタイムな集計結果が格納されました
- Lambda のタンブリングウィンドウで設定したとおり、30 秒おきに平均滞在人数が集計されている
- shard 単位の集計となっており、shard を超えた集計は出来ない
検証を通じてわかったこと
- Lambda タンブリングウィンドウを使うことで、面倒なストリーミングデータのリアルタイム集計が楽になる
- Lambda タンブリングウィンドウの設定は、Kinesis Data Streams の shard 単位の集計となっており、shard を超えた集計は出来ない
- 超えて集計したい場合は、DynamoDB などに出力したあとに集計処理を行うとよいかも。
- タンブリングウィンドウの最大は 15 分
- state の上限は 1MB。これを超えると早期終了する。
- もしくは、Kinesis Data Analytics で集計を行うとシャードを超えて集計が可能と思われる。
- Kinesis Lambda の連携は、1秒に1回ポーリング
- https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-kinesis.html
-
レコードの Kinesis ストリームにある各シャードを 1 秒あたり 1 回の基本レートでポーリングします
付録 : Lambda が受け取る event の中身
Lambda 関数が受け取る Event を載せます。
初回呼び出し : state なし
- Kinesis Data Streams から10件のデータを受け取っている
- isFinalInvokeForWindow : false
- state : 初回呼び出しなので空っぽ
- window : 開始時間と終了時間が格納されている
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "88b6423a-55a8-4f0a-9ad4-9439b7b7ffa1",
"sequenceNumber": "49628438283851452250018774692429789180601711843174514690",
"data": "eyJST09NIjogMSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDg0fQ==",
"approximateArrivalTimestamp": 1649583654.921
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018774692429789180601711843174514690",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "453140e9-cf6f-4c59-9def-91a80ea1bdce",
"sequenceNumber": "49628438283851452250018774692434624883880170359873339394",
"data": "eyJST09NIjogMiwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDIzfQ==",
"approximateArrivalTimestamp": 1649583654.942
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018774692434624883880170359873339394",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "37f3a705-4a04-4b82-b28f-18e5f8d1f121",
"sequenceNumber": "49628438283851452250018774692437042735519399618222751746",
"data": "eyJST09NIjogNCwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDY5fQ==",
"approximateArrivalTimestamp": 1649583654.945
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018774692437042735519399618222751746",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1bc63c1e-b68b-4751-9f17-9b76dd345d52",
"sequenceNumber": "49628438283851452250018774692438251661339014247397457922",
"data": "eyJST09NIjogNiwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDI3fQ==",
"approximateArrivalTimestamp": 1649583654.945
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018774692438251661339014247397457922",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "cc806a2d-e967-45ca-8cc3-502d51792169",
"sequenceNumber": "49628438283851452250018774692441878438797858203641053186",
"data": "eyJST09NIjogNSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDE0fQ==",
"approximateArrivalTimestamp": 1649583654.95
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018774692441878438797858203641053186",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "9a0402c5-aaac-47f7-bf6e-376192052706",
"sequenceNumber": "49628438283851452250018774692443087364617472764096282626",
"data": "eyJST09NIjogOSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDg1fQ==",
"approximateArrivalTimestamp": 1649583654.953
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018774692443087364617472764096282626",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "cf965cfc-0f1e-44f6-a508-25b7450e2de5",
"sequenceNumber": "49628438283851452250018774692445505216256702022445694978",
"data": "eyJST09NIjogNywgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDEwMH0=",
"approximateArrivalTimestamp": 1649583654.965
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018774692445505216256702022445694978",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "11b49d8b-bc6a-4ec2-ab7f-3dd774d62860",
"sequenceNumber": "49628438283851452250018774692467265881009765416309882882",
"data": "eyJST09NIjogMywgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDU3fQ==",
"approximateArrivalTimestamp": 1649583654.982
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018774692467265881009765416309882882",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "4bb30523-c946-4572-b23c-51cbf69f8136",
"sequenceNumber": "49628438283851452250018774692469683732648994605939818498",
"data": "eyJST09NIjogOCwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDgxfQ==",
"approximateArrivalTimestamp": 1649583654.984
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018774692469683732648994605939818498",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "b0214d52-2125-41b6-ac86-02a8c5270a87",
"sequenceNumber": "49628438283851452250018774693182949966221625887735939074",
"data": "eyJST09NIjogNSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU1IiwgIlBFT1BMRSI6IDk4fQ==",
"approximateArrivalTimestamp": 1649583655.935
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018774693182949966221625887735939074",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
}
],
"shardId": "shardId-000000000000",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source",
"window": {
"start": "2022-04-10T09:40:30Z",
"end": "2022-04-10T09:41:00Z"
},
"state": {},
"isFinalInvokeForWindow": false,
"isWindowTerminatedEarly": false
}
途中の呼び出し : state あり
- Kinesis Data Streams から10件のデータを受け取っている
- isFinalInvokeForWindow : false
- state : 2回目以降の呼び出しなので、state を受け取っている
- window : 開始時間と終了時間が格納されている
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "e6965fe8-99fb-4b30-bc7e-ec15e0701005",
"sequenceNumber": "49628438283851452250018780975352673409626969805380648962",
"data": "eyJST09NIjogMiwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjAzIiwgIlBFT1BMRSI6IDg2fQ==",
"approximateArrivalTimestamp": 1649592364.071
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018780975352673409626969805380648962",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "7d948f85-5a1d-49e6-ab87-b53ef2378289",
"sequenceNumber": "49628438283851452250018780975353882335446584434555355138",
"data": "eyJST09NIjogOSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjAzIiwgIlBFT1BMRSI6IDkzfQ==",
"approximateArrivalTimestamp": 1649592364.071
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018780975353882335446584434555355138",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "b998ace7-3ae7-4dbe-a4e3-f96b7bd14240",
"sequenceNumber": "49628438283851452250018780975356300187085813692904767490",
"data": "eyJST09NIjogNywgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjAzIiwgIlBFT1BMRSI6IDY3fQ==",
"approximateArrivalTimestamp": 1649592364.074
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018780975356300187085813692904767490",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "d68f6905-91c8-47f3-8d34-3efdb0b311b3",
"sequenceNumber": "49628438283851452250018780975357509112905428322079473666",
"data": "eyJST09NIjogNCwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjAzIiwgIlBFT1BMRSI6IDkzfQ==",
"approximateArrivalTimestamp": 1649592364.074
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018780975357509112905428322079473666",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "7f2b045e-4518-4584-8ef3-004ba79b9f83",
"sequenceNumber": "49628438283851452250018780975380478703478106276398891010",
"data": "eyJST09NIjogOCwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjAzIiwgIlBFT1BMRSI6IDIyfQ==",
"approximateArrivalTimestamp": 1649592364.112
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018780975380478703478106276398891010",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "51dda732-fb0c-416a-bce4-0c9b000baa4b",
"sequenceNumber": "49628438283851452250018780976003075500579640370092048386",
"data": "eyJST09NIjogMSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjA0IiwgIlBFT1BMRSI6IDYyfQ==",
"approximateArrivalTimestamp": 1649592365.046
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018780976003075500579640370092048386",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "49ace2dc-bdbf-4471-9e26-7e3c623aeaeb",
"sequenceNumber": "49628438283851452250018780976005493352218869628441460738",
"data": "eyJST09NIjogOCwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjA0IiwgIlBFT1BMRSI6IDYxfQ==",
"approximateArrivalTimestamp": 1649592365.049
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018780976005493352218869628441460738",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "d4807f04-f3cc-4d06-9432-cdb8f7a08eee",
"sequenceNumber": "49628438283851452250018780976011537981316942774314991618",
"data": "eyJST09NIjogOSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjA0IiwgIlBFT1BMRSI6IDkzfQ==",
"approximateArrivalTimestamp": 1649592365.059
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018780976011537981316942774314991618",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "325b243e-55c8-4bcd-a569-a8566084194c",
"sequenceNumber": "49628438283851452250018780976012746907136557403489697794",
"data": "eyJST09NIjogNSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjA0IiwgIlBFT1BMRSI6IDE5fQ==",
"approximateArrivalTimestamp": 1649592365.06
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018780976012746907136557403489697794",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "28a6021a-a682-4637-846d-679bfb82d4a6",
"sequenceNumber": "49628438283851452250018780976013955832956172032664403970",
"data": "eyJST09NIjogMywgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjA0IiwgIlBFT1BMRSI6IDUxfQ==",
"approximateArrivalTimestamp": 1649592365.062
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49628438283851452250018780976013955832956172032664403970",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
}
],
"shardId": "shardId-000000000000",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source",
"window": {
"start": "2022-04-10T12:06:00Z",
"end": "2022-04-10T12:06:30Z"
},
"state": {
"1": {
"PEOPLE_SUM": 124,
"RECORD_COUNT": 5,
"PEOPLE_AVG": 24.8
},
"2": {
"PEOPLE_SUM": 319,
"RECORD_COUNT": 4,
"PEOPLE_AVG": 79.75
},
"3": {
"PEOPLE_SUM": 220,
"RECORD_COUNT": 5,
"PEOPLE_AVG": 44
},
"4": {
"PEOPLE_SUM": 162,
"RECORD_COUNT": 4,
"PEOPLE_AVG": 40.5
},
"5": {
"PEOPLE_SUM": 194,
"RECORD_COUNT": 5,
"PEOPLE_AVG": 38.8
},
"6": {
"PEOPLE_SUM": 199,
"RECORD_COUNT": 5,
"PEOPLE_AVG": 39.8
},
"7": {
"PEOPLE_SUM": 170,
"RECORD_COUNT": 4,
"PEOPLE_AVG": 42.5
},
"8": {
"PEOPLE_SUM": 257,
"RECORD_COUNT": 4,
"PEOPLE_AVG": 64.25
},
"9": {
"PEOPLE_SUM": 268,
"RECORD_COUNT": 4,
"PEOPLE_AVG": 67
}
},
"isFinalInvokeForWindow": false,
"isWindowTerminatedEarly": false
}
最後の呼び出し : state あり
- Kinesis Data Streams から**データを受け取らない**
- isFinalInvokeForWindow : true
- state : 受け取っている
- window : 開始時間と終了時間が格納されている
{
"Records": [],
"shardId": "shardId-000000000000",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source",
"window": {
"start": "2022-04-10T12:05:30Z",
"end": "2022-04-10T12:06:00Z"
},
"state": {
"1": {
"PEOPLE_SUM": 1829,
"RECORD_COUNT": 30,
"PEOPLE_AVG": 60.96666666666667
},
"2": {
"PEOPLE_SUM": 1529,
"RECORD_COUNT": 30,
"PEOPLE_AVG": 50.96666666666667
},
"3": {
"PEOPLE_SUM": 1784,
"RECORD_COUNT": 30,
"PEOPLE_AVG": 59.46666666666667
},
"4": {
"PEOPLE_SUM": 1665,
"RECORD_COUNT": 30,
"PEOPLE_AVG": 55.5
},
"5": {
"PEOPLE_SUM": 1764,
"RECORD_COUNT": 30,
"PEOPLE_AVG": 58.8
},
"6": {
"PEOPLE_SUM": 1577,
"RECORD_COUNT": 30,
"PEOPLE_AVG": 52.56666666666667
},
"7": {
"PEOPLE_SUM": 1685,
"RECORD_COUNT": 30,
"PEOPLE_AVG": 56.166666666666664
},
"8": {
"PEOPLE_SUM": 1632,
"RECORD_COUNT": 30,
"PEOPLE_AVG": 54.4
},
"9": {
"PEOPLE_SUM": 1689,
"RECORD_COUNT": 30,
"PEOPLE_AVG": 56.3
}
},
"isFinalInvokeForWindow": true,
"isWindowTerminatedEarly": false
}
参考URL