2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Oura Ringのデータを可視化する: 1 - Snowflakeにデータを格納するまで

Last updated at Posted at 2023-08-05

目次

1. この記事の内容
2. Snowflakeへ保存するデータ
3. OuraRingのAPIからデータを取得する
4. 取得したデータをSnowflakeのTableにInsertする
5. Pythonのコード全体
6. Dailyバッチ化する
7. おわりに

この記事の内容

先日Oura Ringという、バイタルデータを計測できるスマートリングを購入しました。
この様に専用のアプリで、いろんなデータを閲覧することができるのですが、データを取得できるAPIがあるので、Snowflakeへ取り込んでdbtで集計し、Lightdash or Tableauで可視化するまでをやってみようと思います。

この記事はその第一弾として、Oura RingのデータをPythonで取得し、Snowparkを使用してSnowflakeに保存するまでをやってみたものです。

Snowflakeへ保存するデータ

APIで取得するOuraRingのデータ

以下のデータをSnowflakeへ取り込んでいます。データの内容についてはまだ理解が及んでいないので後日加筆修正するかもしれません。参考

endpoint データの内容
daily_activity 毎日の活動のサマリー。1日の消費カロリー、休憩時間など。
daily_readiness その日に対する準備がどの程度整っているか。アプリの画面で閲覧できるコンディションの値や前日の睡眠の質の情報が入っているっぽい?
daily_sleep 睡眠時間のサマリー。昼寝の時間も含む、1日分の集計値。
heartrate 心拍数のデータ
sleep daily_sleepとは違い、各睡眠時間の情報が入っている (複数回昼寝したら、そのそれぞれの情報が取得できる)
workout サイクリングやランニングなどのトレーニングの情報
personal_info ユーザ情報。ユーザ識別子やメールアドレス。登録している年齢など。

Snowflakeへ保存する際のデータ構造

各endpointから得られるデータをそれぞれ個別のテーブルに格納する。ただし全て同じテーブルにしている。つまり、各データの固有の仕様に基づいた加工処理は全て dbt で実施する前提。

colmun name description
time データを取得する処理を行ったUNIXTIME。取り込み処理を行う度に変化する。
processd_date (列名が微妙ですが…)データの対象日付。同じ日付のデータを後日取り込み直してもこの日付は変わらない。
user personal_infoから取得するユーザ識別子
data APIの各endpointから取得したデータをVARIANTで格納

OuraRingのAPIからデータを取得する

Oura RingのAPIからのデータ取得についてはouraというPackageがあるのですが、今回はそれを使用せず自分でコードを書いています。一部対応していないデータがあったためです。

処理としては以下のような感じで素直に実装しています。趣味開発なのもあり、エラーハンドリングは一切していません。

  • Requestsを使ってendpointにアクセス
    • 認証はPersonalAccessTokenを使用 (ここから取得できます)
    • 取得するデータの範囲は前日の1日分のみ
  • 各endpointからのresponseをjsonにして返却する
import json
import os
import time
from datetime import datetime, timedelta, timezone

import pandas as pd
import requests


def today():
    JST = timezone(timedelta(hours=+9), "JST")
    dt_now = datetime.now(JST)
    today = dt_now.strftime("%Y-%m-%d")
    return today


def yesterday():
    JST = timezone(timedelta(hours=+9), "JST")
    dt_now = datetime.now(JST)
    yesterday = (dt_now - timedelta(1)).strftime("%Y-%m-%d")
    return yesterday


class Oura:
    def __init__(
        self,
        personal_access_token: str,
        start_date: str = yesterday(),
        end_date: str = today(),
    ):
        self.processed_date = start_date
        self.endpoint = "https://api.ouraring.com/v2/usercollection/"
        self.headers = {"Authorization": f"Bearer {personal_access_token}"}
        self.params = {"start_date": start_date, "end_date": end_date}
        self.uid = json.loads(
            requests.request(
                "GET", self.endpoint + "personal_info", headers=self.headers
            ).text
        )["id"]

    def request_api(self, method):
        res = requests.request(
            "GET", self.endpoint + method, headers=self.headers, params=self.params
        )
        if method == "personal_info":
            data = [json.loads(res.text)]
        else:
            data = json.loads(res.text)["data"]

        return {
            "time": time.time(),
            "processed_date": self.processed_date,
            "user": self.uid,
            "data": data,
        }

取得したデータをSnowflakeのTableにInsertする

Snowparkを使用しています。pandas DataFrameとテーブル名を受け取り、そのテーブルにAppendするという単純な処理にしています。これだと再実行時にデータが重複しますが、そのあたりのハンドリングはdbtで対処します。
Snowparkを使用してデータを挿入する際、テーブルが存在しない場合はDataFrameの型に従って、テーブルが作成されます (なのでCreate Tableはしていません)。これは非常に便利ですね。
参考: snowpark.DataFrameWriter
参考: snowpark.DataFrameWriter.mode

from snowflake.snowpark.session import Session

def insert_snowflake_table(df, table_name):
    sf_params = {
        "account": os.environ["SF_ACCOUNT"],
        "user": os.environ["SF_USER"],
        "password": os.environ["SF_PASSWORD"],
        "role": os.environ["SF_ROLE"],
        "warehouse": os.environ["SF_WAREHOUSE"],
        "database": os.environ["SF_DATABASE"],
        "schema": os.environ["SF_SCHEMA"],
    }
    session = Session.builder.configs(sf_params).create()
    df = session.create_dataframe(df)
    df.write.mode("append").save_as_table(table_name)

実行部分

データを取得したendpointに順番にアクセスして、取得したデータをDataFrameにしてからSnowflakeに書き込んで言っているだけです。

def main():
    personal_access_token = os.environ["PERSONAL_ACCESS_TOKEN"]
    client = Oura(
            personal_access_token=personal_access_token,
            start_date=yesteraday(),
            end_date=today()
            )
    methods = [
        "daily_activity",
        "daily_readiness",
        "daily_sleep",
        "heartrate",
        "personal_info",
        "sleep",
        "workout",
    ]
    for method in methods:
        res = client.request_api(method)
        insert_snowflake_table(pd.DataFrame(res), method)


if __name__ == "__main__":
    main()

Pythonのコード全体

コード全体は以下のとおりです

サンプルコード
import json
import os
import time
from datetime import datetime, timedelta, timezone

import pandas as pd
import requests
from snowflake.snowpark.session import Session


def today():
    JST = timezone(timedelta(hours=+9), "JST")
    dt_now = datetime.now(JST)
    today = dt_now.strftime("%Y-%m-%d")
    return today


def yesterday():
    JST = timezone(timedelta(hours=+9), "JST")
    dt_now = datetime.now(JST)
    yesterday = (dt_now - timedelta(1)).strftime("%Y-%m-%d")
    return yesterday


class Oura:
    def __init__(
        self,
        personal_access_token: str,
        start_date: str = yesterday(),
        end_date: str = today(),
    ):
        self.processed_date = start_date
        self.endpoint = "https://api.ouraring.com/v2/usercollection/"
        self.headers = {"Authorization": f"Bearer {personal_access_token}"}
        self.params = {"start_date": start_date, "end_date": end_date}
        self.uid = json.loads(
            requests.request(
                "GET", self.endpoint + "personal_info", headers=self.headers
            ).text
        )["id"]

    def request_api(self, method):
        res = requests.request(
            "GET", self.endpoint + method, headers=self.headers, params=self.params
        )
        if method == "personal_info":
            data = [json.loads(res.text)]
        else:
            data = json.loads(res.text)["data"]

        return {
            "time": time.time(),
            "processed_date": self.processed_date,
            "user": self.uid,
            "data": data,
        }


def insert_snowflake_table(df, table_name):
    sf_params = {
        "account": os.environ["SF_ACCOUNT"],
        "user": os.environ["SF_USER"],
        "password": os.environ["SF_PASSWORD"],
        "role": os.environ["SF_ROLE"],
        "warehouse": os.environ["SF_WAREHOUSE"],
        "database": os.environ["SF_DATABASE"],
        "schema": os.environ["SF_SCHEMA"],
    }
    session = Session.builder.configs(sf_params).create()
    df = session.create_dataframe(df)
    df.write.mode("append").save_as_table(table_name)


def main():
    personal_access_token = os.environ["PERSONAL_ACCESS_TOKEN"]
    client = Oura(
            personal_access_token=personal_access_token,
            start_date=yesterday(),
            end_date=today()
            )
    methods = [
        "daily_activity",
        "daily_readiness",
        "daily_sleep",
        "heartrate",
        "personal_info",
        "sleep",
        "workout",
    ]
    for method in methods:
        res = client.request_api(method)
        insert_snowflake_table(pd.DataFrame(res), method)


if __name__ == "__main__":
    main()

これを実行すると以下のようにSnowflake上にテーブルが作成され、データも書き込まれていきます。

作成したテーブル一覧

image.png

テーブル定義

image.png

Insertされたデータ

image.png

Dailyバッチ化する

毎日JST10時にデータを取得する様に、Github Actionsで動かすように設定しています。
定義は以下の内容です。

name: fetch_oura_ring_data

on:
  schedule:
    - cron: '0 1 * * *'

jobs:
  build:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: "3.10"

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install requests panda snowflake-snowpark-python[pandas]

      - name: Run Python Script
        env:
          SF_ACCOUNT: ${{ vars.SF_ACCOUNT }}
          SF_USER: ${{ vars.SF_USER }}
          SF_PASSWORD: ${{ secrets.SF_PASSWORD }}
          SF_ROLE: ${{ vars.SF_ROLE }}
          SF_WAREHOUSE: ${{ vars.SF_WAREHOUSE }}
          SF_DATABASE: ${{ vars.SF_DATABASE }}
          SF_SCHEMA: ${{ vars.SF_SCHEMA }}
          PERSONAL_ACCESS_TOKEN: ${{ secrets.PERSONAL_ACCESS_TOKEN }}
        run: python app.py

おわりに

これで毎日Oura RingのデータがSnowflakeに書き込まれていく様になりました。
2週間ほどのデータが溜まったら、可視化のためのデータモデルの作成をdbtで実施していこうと思います。

追伸: Snowflakeの中の人へ
現時点(2023-08-06)では、このPythonのコードをSnowflake上で動かすとエラーが出る(Requestsによる外部のAPIへのアクセスが許可されてないっぽい)のですが、そのうち実行できるようになっていくんですかね?そうなるととても便利ですね。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?