0
Help us understand the problem. What are the problem?

posted at

updated at

IoT リアルタイムデータを Lambda タンブリングウィンドウで集計してみた その2 Lambda 編

はじめに

この記事は、前回の続きです。

IoT Core の準備が出来たので、実際に Kinesis Data Streams と Lambda タンブリングウィンドウを連携してみましょう。

システム構成

今回の作業範囲を明記した、システム構成図を載せます。画像右側のデータ集計の本体部分です。

image-20220411000225976.png

Lambda タンブリングウィンドウがデータをどのように集計するのか

今回の構成では、30秒おきに Lambda タンブリングウィンドウをつかってデータ集計をしていきます。30秒の間に複数の Lambda Function がリレーのバトンのように state データを受け渡していきます。state データの受け渡しイメージ図を次に乗せます。わかりやすく書いているので、細かい部分は説明不足な点がありますがご了承ください。

image-20220411002805288.png

タンブリングウィンドウの一定周期の枠の中で、最初に呼び出される関数が空の state を受け取ります。state は、Lambda Handler の event の中に含まれています。2回目以降の関数は state は何かしらのデータが入っているので、空の場合は最初の呼び出しだと識別できます。

def lambda_handler(event, context):
    state = event['state']

Lambda 関数は、以下の例のように、次の Lambda 関数へ state を受け渡しできます。そのため、Kinesis Data Streams から受け取ったデータを、Lambda 内で集計加工して state として受け渡すことが出来ます。

    return {
        "statusCode": 200,
        "state": state  # 次に呼びだされる Lambda Function に引き継がれる
    }

すると、2回目に呼び出された Lambda 関数では、最初の関数の state をそのまま受け取れます。2回目以降の Lambda 関数でも集計加工を行い受け渡し続けることでことで、最後の呼び出し Lambda 関数でリアルタイム集計を行った結果を利用できます。

最後の Lambda 関数かどうか識別するために、Lambda の event に含まれている変数が活用できます。event に含まれる isFinalInvokeForWindowisWindowTerminatedEarly が True の場合は、その Lambda 関数が、タンブリングウィンドウの中で最後の呼び出しになります。

isFinalInvokeForWindow が True の場合は、ウィンドウ時間枠を超えた場合に起きます。isWindowTerminatedEarly が True の場合は、時間範囲内でも、state の内容が 1MB を超えたときに発生します。state の上限が 1MB ということですね。

URL : https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-kinesis.html

ここまで概念的な学習をしたので、これから実際に手を動かす内容に移っていきます。

Kinesis Data Streams の設定

IoT Core から連携する、Kinesis Data Streams を作成していきます。Create data stream を押します。

image-20220409215422895.png

今回は、テスト用途となり性能ピークなどはないので、Provisioned で構成していきます。(名前は kinesis data analytics と書いていますが、Lambda 連携を行っていきます。)

image-20220409220059250.png

Create data stream を押します

image-20220409220116852.png

作成されました。

image-20220409220203940.png

IoT Core Rule を設定

IoT Core の MQTT Topic に送付された JSON データを、Kinesis Data Streams に出力するために、IoT Core Rule を設定します。

image-20220409220809851.png

Rule の名前や条件を指定します。

次のように FROM に MQTT Topic 名を指定することで、この Topic のデータを全て Kinesis Data Streams に流すことが出来ます。

SELECT * FROM 'data/kinesis-test-device'

image-20220409221237480.png

Add action で Kinesis との連携設定を入れていきます。

image-20220409221338798.png

Kinesis Data Streams

image-20220409221414764.png

  • 対象の Data Stream の名前を指定
  • Paritition Key は、なんでもよいので newuuid を指定
  • IAM Role を指定

image-20220409221559991.png

Create Rule を押します

image-20220409221658813.png

作成されたので Enable

image-20220409221731079.png

Enable になりました

image-20220409221841720.png

Lambda 関数の作成

Lambda 関数を作成していきます。この記事の環境では SAM で作成しました。マネージメントコンソールからポチポチと作成しても良いとおもいます。

プログラムの全体を載せます。

from datetime import date
import json
from logging import getLogger, INFO
import base64
import decimal
import boto3
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource('dynamodb')
dynamodb_table = dynamodb.Table('lambda_tumbling_window')

logger = getLogger(__name__)
logger.setLevel(INFO)


def lambda_handler(event, context):
    print("============ logger.info の出力 ============")
    logger.info(json.dumps(event))

    # Window 内の最終呼び出し
    if event['isFinalInvokeForWindow']:
        print('Destination invoke')
        store_state(event)
        return {
            "statusCode": 200,
            "body": json.dumps({
                "message": "isFinalInvokeForWindow is True",
            })
        }
    else:
        print('Aggregate invoke')

    # Window の早期終了
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')
        store_state(event)

    state = event['state']

    for record in event['Records']:
        decodedata = base64.b64decode(record["kinesis"]["data"])
        decodejson = json.loads(decodedata, parse_float=decimal.Decimal)

        # print(decodedata)
        # print(decodejson['ROOM'])

        roomid = str(decodejson['ROOM'])

        # state に存在しない場合は初期化
        if roomid not in state:
            state[roomid] = {"PEOPLE_SUM": 0,
                             "RECORD_COUNT": 0, "PEOPLE_AVG": 0}

        state[roomid]["PEOPLE_SUM"] += decodejson['PEOPLE']
        state[roomid]["RECORD_COUNT"] += 1
        state[roomid]["PEOPLE_AVG"] = state[roomid]["PEOPLE_SUM"] / \
            state[roomid]["RECORD_COUNT"]

    return {
        "statusCode": 200,
        "body": json.dumps({
            "message": "hello tumbling window",
        }),
        "state": state  # state の返却。次に呼びだされる Lambda Function に引き継がれる
    }


def store_state(event):
    state = event['state']
    date_start = event['window']['start']
    date_end = event['window']['end']
    kinesis_shardid = event['shardId']

    for roomid, value in state.items():
        item = {"roomid": roomid,
                "date_start": date_start,
                "date_end": date_end,
                "people_avg": float_to_decimal(value["PEOPLE_AVG"]),
                "kinesis_shardid": kinesis_shardid}
        dynamodb_table.put_item(Item=item)


def float_to_decimal(obj):
    if isinstance(obj, float):
        # note: str casting is important for dynamodb
        return decimal.Decimal(str(obj))
    return obj

いくつかポイントをピックアップします。

次の部分で、Lambda 関数の呼び出しが最後かどうか判定しています。最後の場合は、DynamoDB にデータを吐き出す処理 store_state(event) を行っています。

    # Window 内の最終呼び出し
    if event['isFinalInvokeForWindow']:
        print('Destination invoke')
        store_state(event)
        return {
            "statusCode": 200,
            "body": json.dumps({
                "message": "isFinalInvokeForWindow is True",
            })
        }
    else:
        print('Aggregate invoke')

    # Window の早期終了
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')
        store_state(event)

この部分は、state を event から取り出して、初期化や集計処理を行っている部分です。

    state = event['state']

    for record in event['Records']:
        decodedata = base64.b64decode(record["kinesis"]["data"])
        decodejson = json.loads(decodedata, parse_float=decimal.Decimal)

        # print(decodedata)
        # print(decodejson['ROOM'])

        roomid = str(decodejson['ROOM'])

        # state に存在しない場合は初期化
        if roomid not in state:
            state[roomid] = {"PEOPLE_SUM": 0,
                             "RECORD_COUNT": 0, "PEOPLE_AVG": 0}

        state[roomid]["PEOPLE_SUM"] += decodejson['PEOPLE']
        state[roomid]["RECORD_COUNT"] += 1
        state[roomid]["PEOPLE_AVG"] = state[roomid]["PEOPLE_SUM"] / \
            state[roomid]["RECORD_COUNT"]

Kinesis Data Streams からは、次のように ROOM ごとの滞在人数の JSON データがストリーミングで送られてくるため、滞在人数の平均をリアルタイムで集計をしていっています。

{
  "ROOM": 7,
  "EVENTTIME": "2022-04-10T12:56:09",
  "PEOPLE": 32
}

次の関数に state を引き継ぐ部分はこちらです。return で返却すると、次に受け渡してくれます。便利ですね。

    return {
        "statusCode": 200,
        "body": json.dumps({
            "message": "hello tumbling window",
        }),
        "state": state  # state の返却。次に呼びだされる Lambda Function に引き継がれる
    }

Lambda タンブリングウィンドウの設定

Lambda 関数が作成できたら、Kinesis Data Streams との連携を行います。Add trigger を押します。

image-20220410191148899.png

Kinesis Data Streams を選択して、Batch size などを選択します。

image-20220410191309615.png

タンブリングウィンドウの設定個所はここです。地味なので見逃さないようにしましょう。 この設定値は、30 秒ごとの周期で集計処理を行う設定になります。最大 15分まで指定可能です。

image-20220410191432994.png

add を押します。

image-20220410191443855.png

追加されました。

image-20220410191502555.png

DynamoDB Table の作成

Lambda が最終的に書き込む DymamoDB Table を用意します。Create table を押します。

image-20220410212732116.png

次の値を入れます。

lambda_tumbling_window
roomid
data_start

image-20220410214936658.png

Create Table

image-20220410212850896.png

作成されました

image-20220410212935400.png

動作確認

疑似 IoT デバイスとして用意した、Amazon Linux2 で以下のプログラムを動かします。

cd ~/temp/kinesis-data-analytics
python3 device_main.py --device_name kinesis-test-device --endpoint aa49ciipg36pc-ats.iot.ap-northeast-1.amazonaws.com

1秒おきに、IoT Core にデータが送られ、Kinesis 経由で Lambda が起動します。

Lambda が正常に稼働して、DynamoDB にリアルタイムな集計結果が格納されました

  • Lambda のタンブリングウィンドウで設定したとおり、30 秒おきに平均滞在人数が集計されている
  • shard 単位の集計となっており、shard を超えた集計は出来ない

image-20220410221417603.png

検証を通じてわかったこと

  • Lambda タンブリングウィンドウを使うことで、面倒なストリーミングデータのリアルタイム集計が楽になる
  • Lambda タンブリングウィンドウの設定は、Kinesis Data Streams の shard 単位の集計となっており、shard を超えた集計は出来ない
    • 超えて集計したい場合は、DynamoDB などに出力したあとに集計処理を行うとよいかも。
  • タンブリングウィンドウの最大は 15 分
  • state の上限は 1MB。これを超えると早期終了する。
  • もしくは、Kinesis Data Analytics で集計を行うとシャードを超えて集計が可能と思われる。
  • Kinesis Lambda の連携は、1秒に1回ポーリング

付録 : Lambda が受け取る event の中身

Lambda 関数が受け取る Event を載せます。

初回呼び出し : state なし

  • Kinesis Data Streams から10件のデータを受け取っている
  • isFinalInvokeForWindow : false
  • state : 初回呼び出しなので空っぽ
  • window : 開始時間と終了時間が格納されている
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "88b6423a-55a8-4f0a-9ad4-9439b7b7ffa1",
                "sequenceNumber": "49628438283851452250018774692429789180601711843174514690",
                "data": "eyJST09NIjogMSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDg0fQ==",
                "approximateArrivalTimestamp": 1649583654.921
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018774692429789180601711843174514690",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "453140e9-cf6f-4c59-9def-91a80ea1bdce",
                "sequenceNumber": "49628438283851452250018774692434624883880170359873339394",
                "data": "eyJST09NIjogMiwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDIzfQ==",
                "approximateArrivalTimestamp": 1649583654.942
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018774692434624883880170359873339394",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "37f3a705-4a04-4b82-b28f-18e5f8d1f121",
                "sequenceNumber": "49628438283851452250018774692437042735519399618222751746",
                "data": "eyJST09NIjogNCwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDY5fQ==",
                "approximateArrivalTimestamp": 1649583654.945
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018774692437042735519399618222751746",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1bc63c1e-b68b-4751-9f17-9b76dd345d52",
                "sequenceNumber": "49628438283851452250018774692438251661339014247397457922",
                "data": "eyJST09NIjogNiwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDI3fQ==",
                "approximateArrivalTimestamp": 1649583654.945
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018774692438251661339014247397457922",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "cc806a2d-e967-45ca-8cc3-502d51792169",
                "sequenceNumber": "49628438283851452250018774692441878438797858203641053186",
                "data": "eyJST09NIjogNSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDE0fQ==",
                "approximateArrivalTimestamp": 1649583654.95
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018774692441878438797858203641053186",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "9a0402c5-aaac-47f7-bf6e-376192052706",
                "sequenceNumber": "49628438283851452250018774692443087364617472764096282626",
                "data": "eyJST09NIjogOSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDg1fQ==",
                "approximateArrivalTimestamp": 1649583654.953
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018774692443087364617472764096282626",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "cf965cfc-0f1e-44f6-a508-25b7450e2de5",
                "sequenceNumber": "49628438283851452250018774692445505216256702022445694978",
                "data": "eyJST09NIjogNywgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDEwMH0=",
                "approximateArrivalTimestamp": 1649583654.965
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018774692445505216256702022445694978",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "11b49d8b-bc6a-4ec2-ab7f-3dd774d62860",
                "sequenceNumber": "49628438283851452250018774692467265881009765416309882882",
                "data": "eyJST09NIjogMywgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDU3fQ==",
                "approximateArrivalTimestamp": 1649583654.982
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018774692467265881009765416309882882",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "4bb30523-c946-4572-b23c-51cbf69f8136",
                "sequenceNumber": "49628438283851452250018774692469683732648994605939818498",
                "data": "eyJST09NIjogOCwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU0IiwgIlBFT1BMRSI6IDgxfQ==",
                "approximateArrivalTimestamp": 1649583654.984
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018774692469683732648994605939818498",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "b0214d52-2125-41b6-ac86-02a8c5270a87",
                "sequenceNumber": "49628438283851452250018774693182949966221625887735939074",
                "data": "eyJST09NIjogNSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDE4OjQwOjU1IiwgIlBFT1BMRSI6IDk4fQ==",
                "approximateArrivalTimestamp": 1649583655.935
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018774693182949966221625887735939074",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/event-print-EventPrintFunctionRole-1443LILCH2H9K",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        }
    ],
    "shardId": "shardId-000000000000",
    "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source",
    "window": {
        "start": "2022-04-10T09:40:30Z",
        "end": "2022-04-10T09:41:00Z"
    },
    "state": {},
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}

途中の呼び出し : state あり

  • Kinesis Data Streams から10件のデータを受け取っている
  • isFinalInvokeForWindow : false
  • state : 2回目以降の呼び出しなので、state を受け取っている
  • window : 開始時間と終了時間が格納されている
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "e6965fe8-99fb-4b30-bc7e-ec15e0701005",
                "sequenceNumber": "49628438283851452250018780975352673409626969805380648962",
                "data": "eyJST09NIjogMiwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjAzIiwgIlBFT1BMRSI6IDg2fQ==",
                "approximateArrivalTimestamp": 1649592364.071
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018780975352673409626969805380648962",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "7d948f85-5a1d-49e6-ab87-b53ef2378289",
                "sequenceNumber": "49628438283851452250018780975353882335446584434555355138",
                "data": "eyJST09NIjogOSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjAzIiwgIlBFT1BMRSI6IDkzfQ==",
                "approximateArrivalTimestamp": 1649592364.071
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018780975353882335446584434555355138",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "b998ace7-3ae7-4dbe-a4e3-f96b7bd14240",
                "sequenceNumber": "49628438283851452250018780975356300187085813692904767490",
                "data": "eyJST09NIjogNywgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjAzIiwgIlBFT1BMRSI6IDY3fQ==",
                "approximateArrivalTimestamp": 1649592364.074
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018780975356300187085813692904767490",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "d68f6905-91c8-47f3-8d34-3efdb0b311b3",
                "sequenceNumber": "49628438283851452250018780975357509112905428322079473666",
                "data": "eyJST09NIjogNCwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjAzIiwgIlBFT1BMRSI6IDkzfQ==",
                "approximateArrivalTimestamp": 1649592364.074
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018780975357509112905428322079473666",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "7f2b045e-4518-4584-8ef3-004ba79b9f83",
                "sequenceNumber": "49628438283851452250018780975380478703478106276398891010",
                "data": "eyJST09NIjogOCwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjAzIiwgIlBFT1BMRSI6IDIyfQ==",
                "approximateArrivalTimestamp": 1649592364.112
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018780975380478703478106276398891010",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "51dda732-fb0c-416a-bce4-0c9b000baa4b",
                "sequenceNumber": "49628438283851452250018780976003075500579640370092048386",
                "data": "eyJST09NIjogMSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjA0IiwgIlBFT1BMRSI6IDYyfQ==",
                "approximateArrivalTimestamp": 1649592365.046
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018780976003075500579640370092048386",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "49ace2dc-bdbf-4471-9e26-7e3c623aeaeb",
                "sequenceNumber": "49628438283851452250018780976005493352218869628441460738",
                "data": "eyJST09NIjogOCwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjA0IiwgIlBFT1BMRSI6IDYxfQ==",
                "approximateArrivalTimestamp": 1649592365.049
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018780976005493352218869628441460738",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "d4807f04-f3cc-4d06-9432-cdb8f7a08eee",
                "sequenceNumber": "49628438283851452250018780976011537981316942774314991618",
                "data": "eyJST09NIjogOSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjA0IiwgIlBFT1BMRSI6IDkzfQ==",
                "approximateArrivalTimestamp": 1649592365.059
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018780976011537981316942774314991618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "325b243e-55c8-4bcd-a569-a8566084194c",
                "sequenceNumber": "49628438283851452250018780976012746907136557403489697794",
                "data": "eyJST09NIjogNSwgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjA0IiwgIlBFT1BMRSI6IDE5fQ==",
                "approximateArrivalTimestamp": 1649592365.06
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018780976012746907136557403489697794",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "28a6021a-a682-4637-846d-679bfb82d4a6",
                "sequenceNumber": "49628438283851452250018780976013955832956172032664403970",
                "data": "eyJST09NIjogMywgIkVWRU5UVElNRSI6ICIyMDIyLTA0LTEwVDIxOjA2OjA0IiwgIlBFT1BMRSI6IDUxfQ==",
                "approximateArrivalTimestamp": 1649592365.062
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49628438283851452250018780976013955832956172032664403970",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxx:role/lambda-tumbling-window-TumblingWindowFunctionRole-AAS117PL1SYE",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source"
        }
    ],
    "shardId": "shardId-000000000000",
    "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source",
    "window": {
        "start": "2022-04-10T12:06:00Z",
        "end": "2022-04-10T12:06:30Z"
    },
    "state": {
        "1": {
            "PEOPLE_SUM": 124,
            "RECORD_COUNT": 5,
            "PEOPLE_AVG": 24.8
        },
        "2": {
            "PEOPLE_SUM": 319,
            "RECORD_COUNT": 4,
            "PEOPLE_AVG": 79.75
        },
        "3": {
            "PEOPLE_SUM": 220,
            "RECORD_COUNT": 5,
            "PEOPLE_AVG": 44
        },
        "4": {
            "PEOPLE_SUM": 162,
            "RECORD_COUNT": 4,
            "PEOPLE_AVG": 40.5
        },
        "5": {
            "PEOPLE_SUM": 194,
            "RECORD_COUNT": 5,
            "PEOPLE_AVG": 38.8
        },
        "6": {
            "PEOPLE_SUM": 199,
            "RECORD_COUNT": 5,
            "PEOPLE_AVG": 39.8
        },
        "7": {
            "PEOPLE_SUM": 170,
            "RECORD_COUNT": 4,
            "PEOPLE_AVG": 42.5
        },
        "8": {
            "PEOPLE_SUM": 257,
            "RECORD_COUNT": 4,
            "PEOPLE_AVG": 64.25
        },
        "9": {
            "PEOPLE_SUM": 268,
            "RECORD_COUNT": 4,
            "PEOPLE_AVG": 67
        }
    },
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}

最後の呼び出し : state あり

  • Kinesis Data Streams から**データを受け取らない**
  • isFinalInvokeForWindow : true
  • state : 受け取っている
  • window : 開始時間と終了時間が格納されている
{
    "Records": [],
    "shardId": "shardId-000000000000",
    "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxx:stream/kinesis-data-analytics-source",
    "window": {
        "start": "2022-04-10T12:05:30Z",
        "end": "2022-04-10T12:06:00Z"
    },
    "state": {
        "1": {
            "PEOPLE_SUM": 1829,
            "RECORD_COUNT": 30,
            "PEOPLE_AVG": 60.96666666666667
        },
        "2": {
            "PEOPLE_SUM": 1529,
            "RECORD_COUNT": 30,
            "PEOPLE_AVG": 50.96666666666667
        },
        "3": {
            "PEOPLE_SUM": 1784,
            "RECORD_COUNT": 30,
            "PEOPLE_AVG": 59.46666666666667
        },
        "4": {
            "PEOPLE_SUM": 1665,
            "RECORD_COUNT": 30,
            "PEOPLE_AVG": 55.5
        },
        "5": {
            "PEOPLE_SUM": 1764,
            "RECORD_COUNT": 30,
            "PEOPLE_AVG": 58.8
        },
        "6": {
            "PEOPLE_SUM": 1577,
            "RECORD_COUNT": 30,
            "PEOPLE_AVG": 52.56666666666667
        },
        "7": {
            "PEOPLE_SUM": 1685,
            "RECORD_COUNT": 30,
            "PEOPLE_AVG": 56.166666666666664
        },
        "8": {
            "PEOPLE_SUM": 1632,
            "RECORD_COUNT": 30,
            "PEOPLE_AVG": 54.4
        },
        "9": {
            "PEOPLE_SUM": 1689,
            "RECORD_COUNT": 30,
            "PEOPLE_AVG": 56.3
        }
    },
    "isFinalInvokeForWindow": true,
    "isWindowTerminatedEarly": false
}

参考URL

Register as a new user and use Qiita more conveniently

  1. You can follow users and tags
  2. you can stock useful information
  3. You can make editorial suggestions for articles
What you can do with signing up
0
Help us understand the problem. What are the problem?