AWS LambdaでDynamoDBから取得した値に任意の集計をかける(グルーピング処理追加)

  • 2
    いいね
  • 0
    コメント

以前の投稿の更新版です。前の仕組みでは一つのデータに対しての集計しかけられなかったのですが、それでは実運用上あまりに不便、と思ったので、指定した値でグルーピングできるようにしてみました。

AWS LambdaでDynamoDBから取得した値に任意の集計をかける

インプットデータのフォーマット変更

ほとんど使い方は以前のものと一緒ですが、以下の点だけ変えました。

  1. IDは配列形式(["sensor1", "sensor2"])で指定するように仕様変更した。
  2. テーブル名を環境変数から取得するようにした。

IDは配列形式(["sensor1", "sensor2"])で指定するように仕様変更した

こんな感じが最新のフォーマットです。

{
  "label_id": "id",
  "label_range": "timestamp",
  "id": [
    "sensor1",
    "sensor2"
  ],
  "aggregator": "latest",
  "time_from": "2017-04-30T22:00:00.000",
  "time_to": "2017-04-30T22:06:00.000",
  "params": {
    "range": "timestamp"
  }
}

IDの部分を配列にしました。こうすることで指定したIDの最新値を取得することが可能になります。
戻り値はこんな感じになります。

"[{\"timestamp\": \"2017-04-30T22:05:00.000\", \"score\": 0.0, \"id\": \"sensor1\"}, {\"timestamp\": \"2017-04-30T22:06:00.000\", \"score\": 1.0, \"id\": \"sensor2\"}]"

ちなみにDynamoDBにはこんな感じの値を用意していました。

スクリーンショット 2017-07-19 18.12.43.png

テーブル名を環境変数から取得するようにした

そのまんまです。 handler.pyの os.environ['TABLE'] の部分です。Lambda実行時に環境変数をこんな感じで指定してください。

スクリーンショット 2017-07-19 18.13.46.png

handler.py
import sys
import boto3
import json
import decimal
import os
from boto3.dynamodb.conditions import Key

from aggregator.lambda_aggregator import LambdaAggregator
from aggregator.latest_aggregator import LatestAggregator
from aggregator.max_aggregator import MaxAggregator
from aggregator.min_aggregator import MinAggregator
from aggregator.sum_aggregator import SumAggregator
from aggregator.avg_aggregator import AvgAggregator
from aggregator.count_aggregator import CountAggregator

dynamodb = boto3.resource('dynamodb')
table    = dynamodb.Table(os.environ['TABLE'])

aggregator_map = {}
aggregator_map['latest'] = LatestAggregator()
aggregator_map['max'] = MaxAggregator()
aggregator_map['min'] = MinAggregator()
aggregator_map['sum'] = SumAggregator()
aggregator_map['avg'] = AvgAggregator()
aggregator_map['count'] = CountAggregator()

def run(event, context):
    check_params(event)
    result = []

    for id in event['id']:
        res = table.query(
                KeyConditionExpression=Key(event['label_id']).eq(id) & Key(event['label_range']).between(event['time_from'], event['time_to']),
                ScanIndexForward=False
            )

        return_response = aggregator_map[event['aggregator']].aggregate(res['Items'], event['params'])
        result.append(return_response)

    return json.dumps(result, default=decimal_default)

def decimal_default(obj):
    if isinstance(obj, decimal.Decimal):
        return float(obj)
    raise TypeError

def check_params(params):
    if 'label_id' not in params or 'label_range' not in params or 'id' not in params or 'aggregator' not in params or 'time_from' not in params or 'time_to' not in params or 'params' not in params:
        sys.stderr.write("Parameters for label_id, label_range, id, aggregator, time_from, time_to and params are needed.")
        sys.exit()

ソース

こちらにコミットしています。(以前のものを更新)

https://github.com/kojiisd/lambda-dynamodb-aggregator