はじめに
センサからのデータをAmazon Timestreamに格納したので、今度はデータを取り出したい。
データの取得方法をLambdaでpython3.9/boto3を使って確認する。
Timestreamテーブル内のデータ
Timestreamは各項目(メジャー)ごとに1レコードを使う形で格納される。
今回の例ではセンサの位置情報(lat/lon)、温度(temp)、湿度(humi)が格納されている。時間間隔は基本的に3分毎。
simId(ディメンション) | timestamp(ディメンション) | measure_name | time | measure_value::double |
---|---|---|---|---|
sim_id | 1674918682708 | lat | 2023-01-28 15:11:24.416000000 | 35.1 |
sim_id | 1674918682708 | lon | 2023-01-28 15:11:24.416000000 | 136.8 |
sim_id | 1674918682708 | temp | 2023-01-28 15:11:24.416000000 | 20.3 |
sim_id | 1674918682708 | humi | 2023-01-28 15:11:24.416000000 | 34.4 |
今回はセンサが1つしかないためsimIdは常に同じ値をとる。
センサから送られてきたtimestampと、Timestream側で挿入されたtimeがあるが、今回はtimestampを時間データとして扱うこととする。
クエリエディタで確認
Lambdaを組む前に、AWSマネジメントコンソールのTimestream クエリエディタで正しくデータを取れるかどうか確認する。
今回は直近1時間の温度データを取りたいので、以下のようになる。
直近1時間前のタイムスタンプはエポックミリ秒の変換サイトなどで別途計算した。
SELECT * FROM "<データベース名>"."<テーブル名>" WHERE measure_name='temp' AND timestamp>'1674953600000'
※今回の例(GPSマルチユニットからの入力)だとtimestampがvarcharになっていることに注意。
実際のマネジメントコンソール画面の取得結果は↓のようになる。
Lambdaでboto3の返却値を確認
boto3でデータ取得をした際の返却値を確認する。
Lambdaのコードは以下で試した。
import datetime
import time
import boto3
def lambda_handler(event, context):
now = datetime.datetime.now()
past = now + datetime.timedelta(hours=-1)
millisec = int(time.mktime(past.timetuple()) * 1000)
query = 'SELECT * FROM "<データベース名>"."<テーブル名>" WHERE measure_name=\'temp\' AND timestamp>\'' + str(millisec) + '\''
timestream_query_client = boto3.client('timestream-query',
aws_access_key_id="..........",
aws_secret_access_key="................")
response = timestream_query_client.query(QueryString=query)
return response
返却値は辞書型で返ってくる。内容は↓
{
"QueryId": "AEIACANJ635U...PUN7TVJ3OAEEBA",
"Rows": [
{
"Data": [
{
"ScalarValue": "8981100005817821334"
},
{
"ScalarValue": "1674958114056"
},
{
"ScalarValue": "temp"
},
{
"ScalarValue": "2023-01-29 02:08:35.444000000"
},
{
"ScalarValue": "22.1"
}
]
},
{
"Data": [
{
"ScalarValue": "8981100005817821334"
},
{
"ScalarValue": "1674961215559"
},
{
"ScalarValue": "temp"
},
{
"ScalarValue": "2023-01-29 03:00:17.095000000"
},
{
"ScalarValue": "22.8"
}
]
},
・
・(中略)
・
{
"Data": [
{
"ScalarValue": "8981100005817821334"
},
{
"ScalarValue": "1674961529911"
},
{
"ScalarValue": "temp"
},
{
"ScalarValue": "2023-01-29 03:05:31.643000000"
},
{
"ScalarValue": "23.1"
}
]
}
],
"ColumnInfo": [
{
"Name": "simId",
"Type": {
"ScalarType": "VARCHAR"
}
},
{
"Name": "timestamp",
"Type": {
"ScalarType": "VARCHAR"
}
},
{
"Name": "measure_name",
"Type": {
"ScalarType": "VARCHAR"
}
},
{
"Name": "time",
"Type": {
"ScalarType": "TIMESTAMP"
}
},
{
"Name": "measure_value::double",
"Type": {
"ScalarType": "DOUBLE"
}
}
],
"QueryStatus": {
"ProgressPercentage": 100,
"CumulativeBytesScanned": 864,
"CumulativeBytesMetered": 10000000
},
"ResponseMetadata": {
"RequestId": "2YEFKLNT43AC34JXI74Q6IR2MI",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amzn-requestid": "2YEFKLNT43AC34JXI74Q6IR2MI",
"content-type": "application/x-amz-json-1.0",
"content-length": "3629",
"date": "Sun, 29 Jan 2023 03:07:47 GMT"
},
"RetryAttempts": 0
}
}
取得した辞書型データをpandas DataFrameに変換する
返却値そのままではDataFrameにできないので、変換処理を入れる。
クエリ~変換部分のコードは以下:
response = timestream_query_client.query(QueryString=query)
rows = response['Rows']
d = {}
for i, row in enumerate(rows):
d[i] = {'timestamp' : row['Data'][1]['ScalarValue'], 'temp' : row['Data'][4]['ScalarValue'], 'simId' : row['Data'][0]['ScalarValue']}
df = pd.DataFrame(data=d).T
print(df)
出力された実行結果:
timestamp temp simId
0 1674971617095 21.8 898110............
1 1674971797090 21.7 898110............
2 1674971977264 22.2 898110............
3 1674972162225 21.9 898110............
4 1674972377265 22.3 898110............
5 1674972512299 21.9 898110............
6 1674972697352 21.9 898110............
7 1674972872389 22.2 898110............
8 1674973052305 21.8 898110............
9 1674973357351 22.3 898110............
10 1674973422458 22.6 898110............
11 1674973607497 22.2 898110............
12 1674973947508 22.6 898110............
13 1674974132554 22.4 898110............
14 1674974312712 21.9 898110............
15 1674974497578 22.1 898110............
16 1674974747683 22.4 898110............
17 1674974867968 22.6 898110............
18 1674975037670 22.3 898110............
無事にTimestreamからデータを取得してDataFrame化できました!