LoginSignup
3
3

More than 5 years have passed since last update.

AWS LambdaでDynamoDBから取得した値に任意の集計をかける(グルーピング処理追加)

Posted at

以前の投稿の更新版です。前の仕組みでは一つのデータに対しての集計しかけられなかったのですが、それでは実運用上あまりに不便、と思ったので、指定した値でグルーピングできるようにしてみました。

AWS LambdaでDynamoDBから取得した値に任意の集計をかける

インプットデータのフォーマット変更

ほとんど使い方は以前のものと一緒ですが、以下の点だけ変えました。

  1. IDは配列形式(["sensor1", "sensor2"])で指定するように仕様変更した。
  2. テーブル名を環境変数から取得するようにした。

IDは配列形式(["sensor1", "sensor2"])で指定するように仕様変更した

こんな感じが最新のフォーマットです。

{
  "label_id": "id",
  "label_range": "timestamp",
  "id": [
    "sensor1",
    "sensor2"
  ],
  "aggregator": "latest",
  "time_from": "2017-04-30T22:00:00.000",
  "time_to": "2017-04-30T22:06:00.000",
  "params": {
    "range": "timestamp"
  }
}

IDの部分を配列にしました。こうすることで指定したIDの最新値を取得することが可能になります。
戻り値はこんな感じになります。

"[{\"timestamp\": \"2017-04-30T22:05:00.000\", \"score\": 0.0, \"id\": \"sensor1\"}, {\"timestamp\": \"2017-04-30T22:06:00.000\", \"score\": 1.0, \"id\": \"sensor2\"}]"

ちなみにDynamoDBにはこんな感じの値を用意していました。

スクリーンショット 2017-07-19 18.12.43.png

テーブル名を環境変数から取得するようにした

そのまんまです。 handler.pyの os.environ['TABLE'] の部分です。Lambda実行時に環境変数をこんな感じで指定してください。

スクリーンショット 2017-07-19 18.13.46.png

handler.py
import sys
import boto3
import json
import decimal
import os
from boto3.dynamodb.conditions import Key

from aggregator.lambda_aggregator import LambdaAggregator
from aggregator.latest_aggregator import LatestAggregator
from aggregator.max_aggregator import MaxAggregator
from aggregator.min_aggregator import MinAggregator
from aggregator.sum_aggregator import SumAggregator
from aggregator.avg_aggregator import AvgAggregator
from aggregator.count_aggregator import CountAggregator

dynamodb = boto3.resource('dynamodb')
table    = dynamodb.Table(os.environ['TABLE'])

aggregator_map = {}
aggregator_map['latest'] = LatestAggregator()
aggregator_map['max'] = MaxAggregator()
aggregator_map['min'] = MinAggregator()
aggregator_map['sum'] = SumAggregator()
aggregator_map['avg'] = AvgAggregator()
aggregator_map['count'] = CountAggregator()

def run(event, context):
    check_params(event)
    result = []

    for id in event['id']:
        res = table.query(
                KeyConditionExpression=Key(event['label_id']).eq(id) & Key(event['label_range']).between(event['time_from'], event['time_to']),
                ScanIndexForward=False
            )

        return_response = aggregator_map[event['aggregator']].aggregate(res['Items'], event['params'])
        result.append(return_response)

    return json.dumps(result, default=decimal_default)

def decimal_default(obj):
    if isinstance(obj, decimal.Decimal):
        return float(obj)
    raise TypeError

def check_params(params):
    if 'label_id' not in params or 'label_range' not in params or 'id' not in params or 'aggregator' not in params or 'time_from' not in params or 'time_to' not in params or 'params' not in params:
        sys.stderr.write("Parameters for label_id, label_range, id, aggregator, time_from, time_to and params are needed.")
        sys.exit()

ソース

こちらにコミットしています。(以前のものを更新)

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3