目次
1. この記事の内容
2. Snowflakeへ保存するデータ
3. OuraRingのAPIからデータを取得する
4. 取得したデータをSnowflakeのTableにInsertする
5. Pythonのコード全体
6. Dailyバッチ化する
7. おわりに
この記事の内容
先日Oura Ringという、バイタルデータを計測できるスマートリングを購入しました。
この様に専用のアプリで、いろんなデータを閲覧することができるのですが、データを取得できるAPIがあるので、Snowflakeへ取り込んでdbtで集計し、Lightdash or Tableauで可視化するまでをやってみようと思います。
この記事はその第一弾として、Oura RingのデータをPythonで取得し、Snowpark
を使用してSnowflakeに保存するまでをやってみたものです。
Snowflakeへ保存するデータ
APIで取得するOuraRingのデータ
以下のデータをSnowflakeへ取り込んでいます。データの内容についてはまだ理解が及んでいないので後日加筆修正するかもしれません。参考
endpoint | データの内容 |
---|---|
daily_activity | 毎日の活動のサマリー。1日の消費カロリー、休憩時間など。 |
daily_readiness | その日に対する準備がどの程度整っているか。アプリの画面で閲覧できるコンディションの値や前日の睡眠の質の情報が入っているっぽい? |
daily_sleep | 睡眠時間のサマリー。昼寝の時間も含む、1日分の集計値。 |
heartrate | 心拍数のデータ |
sleep | daily_sleepとは違い、各睡眠時間の情報が入っている (複数回昼寝したら、そのそれぞれの情報が取得できる) |
workout | サイクリングやランニングなどのトレーニングの情報 |
personal_info | ユーザ情報。ユーザ識別子やメールアドレス。登録している年齢など。 |
Snowflakeへ保存する際のデータ構造
各endpointから得られるデータをそれぞれ個別のテーブルに格納する。ただし全て同じテーブルにしている。つまり、各データの固有の仕様に基づいた加工処理は全て dbt
で実施する前提。
colmun name | description |
---|---|
time | データを取得する処理を行ったUNIXTIME。取り込み処理を行う度に変化する。 |
processd_date | (列名が微妙ですが…)データの対象日付。同じ日付のデータを後日取り込み直してもこの日付は変わらない。 |
user | personal_infoから取得するユーザ識別子 |
data | APIの各endpointから取得したデータをVARIANTで格納 |
OuraRingのAPIからデータを取得する
Oura RingのAPIからのデータ取得についてはouraというPackageがあるのですが、今回はそれを使用せず自分でコードを書いています。一部対応していないデータがあったためです。
処理としては以下のような感じで素直に実装しています。趣味開発なのもあり、エラーハンドリングは一切していません。
- Requestsを使ってendpointにアクセス
- 認証はPersonalAccessTokenを使用 (ここから取得できます)
- 取得するデータの範囲は前日の1日分のみ
- 各endpointからのresponseをjsonにして返却する
import json
import os
import time
from datetime import datetime, timedelta, timezone
import pandas as pd
import requests
def today():
JST = timezone(timedelta(hours=+9), "JST")
dt_now = datetime.now(JST)
today = dt_now.strftime("%Y-%m-%d")
return today
def yesterday():
JST = timezone(timedelta(hours=+9), "JST")
dt_now = datetime.now(JST)
yesterday = (dt_now - timedelta(1)).strftime("%Y-%m-%d")
return yesterday
class Oura:
def __init__(
self,
personal_access_token: str,
start_date: str = yesterday(),
end_date: str = today(),
):
self.processed_date = start_date
self.endpoint = "https://api.ouraring.com/v2/usercollection/"
self.headers = {"Authorization": f"Bearer {personal_access_token}"}
self.params = {"start_date": start_date, "end_date": end_date}
self.uid = json.loads(
requests.request(
"GET", self.endpoint + "personal_info", headers=self.headers
).text
)["id"]
def request_api(self, method):
res = requests.request(
"GET", self.endpoint + method, headers=self.headers, params=self.params
)
if method == "personal_info":
data = [json.loads(res.text)]
else:
data = json.loads(res.text)["data"]
return {
"time": time.time(),
"processed_date": self.processed_date,
"user": self.uid,
"data": data,
}
取得したデータをSnowflakeのTableにInsertする
Snowpark
を使用しています。pandas DataFrameとテーブル名を受け取り、そのテーブルにAppendするという単純な処理にしています。これだと再実行時にデータが重複しますが、そのあたりのハンドリングはdbt
で対処します。
Snowpark
を使用してデータを挿入する際、テーブルが存在しない場合はDataFrameの型に従って、テーブルが作成されます (なのでCreate Tableはしていません)。これは非常に便利ですね。
参考: snowpark.DataFrameWriter
参考: snowpark.DataFrameWriter.mode
from snowflake.snowpark.session import Session
def insert_snowflake_table(df, table_name):
sf_params = {
"account": os.environ["SF_ACCOUNT"],
"user": os.environ["SF_USER"],
"password": os.environ["SF_PASSWORD"],
"role": os.environ["SF_ROLE"],
"warehouse": os.environ["SF_WAREHOUSE"],
"database": os.environ["SF_DATABASE"],
"schema": os.environ["SF_SCHEMA"],
}
session = Session.builder.configs(sf_params).create()
df = session.create_dataframe(df)
df.write.mode("append").save_as_table(table_name)
実行部分
データを取得したendpointに順番にアクセスして、取得したデータをDataFrameにしてからSnowflakeに書き込んで言っているだけです。
def main():
personal_access_token = os.environ["PERSONAL_ACCESS_TOKEN"]
client = Oura(
personal_access_token=personal_access_token,
start_date=yesteraday(),
end_date=today()
)
methods = [
"daily_activity",
"daily_readiness",
"daily_sleep",
"heartrate",
"personal_info",
"sleep",
"workout",
]
for method in methods:
res = client.request_api(method)
insert_snowflake_table(pd.DataFrame(res), method)
if __name__ == "__main__":
main()
Pythonのコード全体
コード全体は以下のとおりです
サンプルコード
import json
import os
import time
from datetime import datetime, timedelta, timezone
import pandas as pd
import requests
from snowflake.snowpark.session import Session
def today():
JST = timezone(timedelta(hours=+9), "JST")
dt_now = datetime.now(JST)
today = dt_now.strftime("%Y-%m-%d")
return today
def yesterday():
JST = timezone(timedelta(hours=+9), "JST")
dt_now = datetime.now(JST)
yesterday = (dt_now - timedelta(1)).strftime("%Y-%m-%d")
return yesterday
class Oura:
def __init__(
self,
personal_access_token: str,
start_date: str = yesterday(),
end_date: str = today(),
):
self.processed_date = start_date
self.endpoint = "https://api.ouraring.com/v2/usercollection/"
self.headers = {"Authorization": f"Bearer {personal_access_token}"}
self.params = {"start_date": start_date, "end_date": end_date}
self.uid = json.loads(
requests.request(
"GET", self.endpoint + "personal_info", headers=self.headers
).text
)["id"]
def request_api(self, method):
res = requests.request(
"GET", self.endpoint + method, headers=self.headers, params=self.params
)
if method == "personal_info":
data = [json.loads(res.text)]
else:
data = json.loads(res.text)["data"]
return {
"time": time.time(),
"processed_date": self.processed_date,
"user": self.uid,
"data": data,
}
def insert_snowflake_table(df, table_name):
sf_params = {
"account": os.environ["SF_ACCOUNT"],
"user": os.environ["SF_USER"],
"password": os.environ["SF_PASSWORD"],
"role": os.environ["SF_ROLE"],
"warehouse": os.environ["SF_WAREHOUSE"],
"database": os.environ["SF_DATABASE"],
"schema": os.environ["SF_SCHEMA"],
}
session = Session.builder.configs(sf_params).create()
df = session.create_dataframe(df)
df.write.mode("append").save_as_table(table_name)
def main():
personal_access_token = os.environ["PERSONAL_ACCESS_TOKEN"]
client = Oura(
personal_access_token=personal_access_token,
start_date=yesterday(),
end_date=today()
)
methods = [
"daily_activity",
"daily_readiness",
"daily_sleep",
"heartrate",
"personal_info",
"sleep",
"workout",
]
for method in methods:
res = client.request_api(method)
insert_snowflake_table(pd.DataFrame(res), method)
if __name__ == "__main__":
main()
これを実行すると以下のようにSnowflake上にテーブルが作成され、データも書き込まれていきます。
作成したテーブル一覧
テーブル定義
Insertされたデータ
Dailyバッチ化する
毎日JST10時にデータを取得する様に、Github Actionsで動かすように設定しています。
定義は以下の内容です。
name: fetch_oura_ring_data
on:
schedule:
- cron: '0 1 * * *'
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install requests panda snowflake-snowpark-python[pandas]
- name: Run Python Script
env:
SF_ACCOUNT: ${{ vars.SF_ACCOUNT }}
SF_USER: ${{ vars.SF_USER }}
SF_PASSWORD: ${{ secrets.SF_PASSWORD }}
SF_ROLE: ${{ vars.SF_ROLE }}
SF_WAREHOUSE: ${{ vars.SF_WAREHOUSE }}
SF_DATABASE: ${{ vars.SF_DATABASE }}
SF_SCHEMA: ${{ vars.SF_SCHEMA }}
PERSONAL_ACCESS_TOKEN: ${{ secrets.PERSONAL_ACCESS_TOKEN }}
run: python app.py
おわりに
これで毎日Oura RingのデータがSnowflakeに書き込まれていく様になりました。
2週間ほどのデータが溜まったら、可視化のためのデータモデルの作成をdbt
で実施していこうと思います。
追伸: Snowflakeの中の人へ
現時点(2023-08-06)では、このPythonのコードをSnowflake上で動かすとエラーが出る(Requestsによる外部のAPIへのアクセスが許可されてないっぽい)のですが、そのうち実行できるようになっていくんですかね?そうなるととても便利ですね。