LoginSignup
1
1

More than 1 year has passed since last update.

Amazon Timestreamからboto3でデータを取得する

Posted at

はじめに

センサからのデータをAmazon Timestreamに格納したので、今度はデータを取り出したい。
データの取得方法をLambdaでpython3.9/boto3を使って確認する。

Timestreamテーブル内のデータ

Timestreamは各項目(メジャー)ごとに1レコードを使う形で格納される。
今回の例ではセンサの位置情報(lat/lon)、温度(temp)、湿度(humi)が格納されている。時間間隔は基本的に3分毎。

simId(ディメンション) timestamp(ディメンション) measure_name time measure_value::double
sim_id 1674918682708 lat 2023-01-28 15:11:24.416000000 35.1
sim_id 1674918682708 lon 2023-01-28 15:11:24.416000000 136.8
sim_id 1674918682708 temp 2023-01-28 15:11:24.416000000 20.3
sim_id 1674918682708 humi 2023-01-28 15:11:24.416000000 34.4

今回はセンサが1つしかないためsimIdは常に同じ値をとる。
センサから送られてきたtimestampと、Timestream側で挿入されたtimeがあるが、今回はtimestampを時間データとして扱うこととする。

クエリエディタで確認

Lambdaを組む前に、AWSマネジメントコンソールのTimestream クエリエディタで正しくデータを取れるかどうか確認する。

今回は直近1時間の温度データを取りたいので、以下のようになる。
直近1時間前のタイムスタンプはエポックミリ秒の変換サイトなどで別途計算した。

SELECT * FROM "<データベース名>"."<テーブル名>" WHERE measure_name='temp' AND timestamp>'1674953600000'

※今回の例(GPSマルチユニットからの入力)だとtimestampがvarcharになっていることに注意。
実際のマネジメントコンソール画面の取得結果は↓のようになる。
008_boto3_01.png

Lambdaでboto3の返却値を確認

boto3でデータ取得をした際の返却値を確認する。
Lambdaのコードは以下で試した。

import datetime
import time
import boto3

def lambda_handler(event, context):

    now = datetime.datetime.now()
    past = now + datetime.timedelta(hours=-1)
    
    millisec = int(time.mktime(past.timetuple()) * 1000)
    
    query = 'SELECT * FROM "<データベース名>"."<テーブル名>" WHERE measure_name=\'temp\' AND timestamp>\'' + str(millisec) + '\''

    timestream_query_client = boto3.client('timestream-query', 
        aws_access_key_id="..........",
        aws_secret_access_key="................")
        
    response = timestream_query_client.query(QueryString=query)
    
    return response

返却値は辞書型で返ってくる。内容は↓

{
  "QueryId": "AEIACANJ635U...PUN7TVJ3OAEEBA",
  "Rows": [
    {
      "Data": [
        {
          "ScalarValue": "8981100005817821334"
        },
        {
          "ScalarValue": "1674958114056"
        },
        {
          "ScalarValue": "temp"
        },
        {
          "ScalarValue": "2023-01-29 02:08:35.444000000"
        },
        {
          "ScalarValue": "22.1"
        }
      ]
    },
    {
      "Data": [
        {
          "ScalarValue": "8981100005817821334"
        },
        {
          "ScalarValue": "1674961215559"
        },
        {
          "ScalarValue": "temp"
        },
        {
          "ScalarValue": "2023-01-29 03:00:17.095000000"
        },
        {
          "ScalarValue": "22.8"
        }
      ]
    },
        
        ・(中略)
        
    {
      "Data": [
        {
          "ScalarValue": "8981100005817821334"
        },
        {
          "ScalarValue": "1674961529911"
        },
        {
          "ScalarValue": "temp"
        },
        {
          "ScalarValue": "2023-01-29 03:05:31.643000000"
        },
        {
          "ScalarValue": "23.1"
        }
      ]
    }
  ],
  "ColumnInfo": [
    {
      "Name": "simId",
      "Type": {
        "ScalarType": "VARCHAR"
      }
    },
    {
      "Name": "timestamp",
      "Type": {
        "ScalarType": "VARCHAR"
      }
    },
    {
      "Name": "measure_name",
      "Type": {
        "ScalarType": "VARCHAR"
      }
    },
    {
      "Name": "time",
      "Type": {
        "ScalarType": "TIMESTAMP"
      }
    },
    {
      "Name": "measure_value::double",
      "Type": {
        "ScalarType": "DOUBLE"
      }
    }
  ],
  "QueryStatus": {
    "ProgressPercentage": 100,
    "CumulativeBytesScanned": 864,
    "CumulativeBytesMetered": 10000000
  },
  "ResponseMetadata": {
    "RequestId": "2YEFKLNT43AC34JXI74Q6IR2MI",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "x-amzn-requestid": "2YEFKLNT43AC34JXI74Q6IR2MI",
      "content-type": "application/x-amz-json-1.0",
      "content-length": "3629",
      "date": "Sun, 29 Jan 2023 03:07:47 GMT"
    },
    "RetryAttempts": 0
  }
}

取得した辞書型データをpandas DataFrameに変換する

返却値そのままではDataFrameにできないので、変換処理を入れる。
クエリ~変換部分のコードは以下:

    response = timestream_query_client.query(QueryString=query)
    
    rows = response['Rows']
    d = {}
    for i, row in enumerate(rows):
        
        d[i] = {'timestamp' : row['Data'][1]['ScalarValue'], 'temp' : row['Data'][4]['ScalarValue'], 'simId' : row['Data'][0]['ScalarValue']}

    df = pd.DataFrame(data=d).T
    print(df)

出力された実行結果:

timestamp  temp                simId
0   1674971617095  21.8  898110............
1   1674971797090  21.7  898110............
2   1674971977264  22.2  898110............
3   1674972162225  21.9  898110............
4   1674972377265  22.3  898110............
5   1674972512299  21.9  898110............
6   1674972697352  21.9  898110............
7   1674972872389  22.2  898110............
8   1674973052305  21.8  898110............
9   1674973357351  22.3  898110............
10  1674973422458  22.6  898110............
11  1674973607497  22.2  898110............
12  1674973947508  22.6  898110............
13  1674974132554  22.4  898110............
14  1674974312712  21.9  898110............
15  1674974497578  22.1  898110............
16  1674974747683  22.4  898110............
17  1674974867968  22.6  898110............
18  1674975037670  22.3  898110............

無事にTimestreamからデータを取得してDataFrame化できました!

1
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1