以前の投稿の更新版です。前の仕組みでは一つのデータに対しての集計しかけられなかったのですが、それでは実運用上あまりに不便、と思ったので、指定した値でグルーピングできるようにしてみました。
AWS LambdaでDynamoDBから取得した値に任意の集計をかける
インプットデータのフォーマット変更
ほとんど使い方は以前のものと一緒ですが、以下の点だけ変えました。
- IDは配列形式(["sensor1", "sensor2"])で指定するように仕様変更した。
- テーブル名を環境変数から取得するようにした。
IDは配列形式(["sensor1", "sensor2"])で指定するように仕様変更した
こんな感じが最新のフォーマットです。
{
"label_id": "id",
"label_range": "timestamp",
"id": [
"sensor1",
"sensor2"
],
"aggregator": "latest",
"time_from": "2017-04-30T22:00:00.000",
"time_to": "2017-04-30T22:06:00.000",
"params": {
"range": "timestamp"
}
}
IDの部分を配列にしました。こうすることで指定したIDの最新値を取得することが可能になります。
戻り値はこんな感じになります。
"[{\"timestamp\": \"2017-04-30T22:05:00.000\", \"score\": 0.0, \"id\": \"sensor1\"}, {\"timestamp\": \"2017-04-30T22:06:00.000\", \"score\": 1.0, \"id\": \"sensor2\"}]"
ちなみにDynamoDBにはこんな感じの値を用意していました。
テーブル名を環境変数から取得するようにした
そのまんまです。 handler.pyの os.environ['TABLE']
の部分です。Lambda実行時に環境変数をこんな感じで指定してください。
handler.py
import sys
import boto3
import json
import decimal
import os
from boto3.dynamodb.conditions import Key
from aggregator.lambda_aggregator import LambdaAggregator
from aggregator.latest_aggregator import LatestAggregator
from aggregator.max_aggregator import MaxAggregator
from aggregator.min_aggregator import MinAggregator
from aggregator.sum_aggregator import SumAggregator
from aggregator.avg_aggregator import AvgAggregator
from aggregator.count_aggregator import CountAggregator
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE'])
aggregator_map = {}
aggregator_map['latest'] = LatestAggregator()
aggregator_map['max'] = MaxAggregator()
aggregator_map['min'] = MinAggregator()
aggregator_map['sum'] = SumAggregator()
aggregator_map['avg'] = AvgAggregator()
aggregator_map['count'] = CountAggregator()
def run(event, context):
check_params(event)
result = []
for id in event['id']:
res = table.query(
KeyConditionExpression=Key(event['label_id']).eq(id) & Key(event['label_range']).between(event['time_from'], event['time_to']),
ScanIndexForward=False
)
return_response = aggregator_map[event['aggregator']].aggregate(res['Items'], event['params'])
result.append(return_response)
return json.dumps(result, default=decimal_default)
def decimal_default(obj):
if isinstance(obj, decimal.Decimal):
return float(obj)
raise TypeError
def check_params(params):
if 'label_id' not in params or 'label_range' not in params or 'id' not in params or 'aggregator' not in params or 'time_from' not in params or 'time_to' not in params or 'params' not in params:
sys.stderr.write("Parameters for label_id, label_range, id, aggregator, time_from, time_to and params are needed.")
sys.exit()
ソース
こちらにコミットしています。(以前のものを更新)