はじめに
近年、健康データを活用したアプリケーションやサービスが増えています。
その中で、Fitbitは世界中で愛用されるウェアラブルデバイスであり、個人の健康データを詳細に追跡できる点が特徴です。
今回は、Fitbitの健康データを使ってデータ分析基盤を構築する方法について解説します。
なぜ作成したのか?
一言でいうとデータ分析したかったから。
私が愛用しているFitbitは、iPhoneアプリで睡眠データ等が見れるんですね。
ただし、このデータは1日1日を見ることができるのですが、1週間まとめて、1ヶ月まとめてデータを見ることができません。
また、今までのデータからより睡眠の質を上げるにはどのような指標を改善していけば良いのかをデータ分析したいというモチベーションが生まれました。
ここでFitbit APIがあるのですが、これがなんと1時間に150回しか叩くことができません(どれだけ叩きたいんだ)。
こういった理由からデータを貯める場所や仕組みが欲しい!!となり、BigQueryを使用することになりました。
使用技術
一貫性と再現性やバージョン管理の観点から全てをTerraformで作成しております。
①定期実行(Scheduler,PubSub)
Cloud Schedulerでは毎日午前1:00にPub/Subを利用して定期実行させて処理を自動化させていきます。特に運用負荷もそこま無いのでリアルタイムメッセージングサービスであるPub/Subは使用する必要性は無いのですが勉強のため使用しています。
resource "google_pubsub_topic" "trigger_topic" {
name = "fitbit-insert-topic"
}
resource "google_cloud_scheduler_job" "daily_job" {
name = "fitbit-api-job"
schedule = "0 1 * * *"
time_zone = "Asia/Tokyo"
description = "This job triggers the Cloud Function daily at 1:00 JTC."
pubsub_target {
topic_name = google_pubsub_topic.trigger_topic.id
data = base64encode("example-data")
}
}
②過去のデータ取得(CloudFunction,BigQuery)
これまでの数年分のデータがBigQueryに溜まっているのですが、これを毎回Cloud Functionで全取得していきます。1つのデータを既存テーブルに追加する方が処理が楽になるのではないかとは思いますが、この理由については後述します。
必要なクレデンシャル情報については全て環境変数として扱い、Cloud Functionに渡すようにしています。
resource "google_cloudfunctions_function" "bigquery_insert_function" {
name = "fitbit-bigquery-insert-function"
runtime = "python310"
entry_point = "append_data_to_bigquery"
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = google_storage_bucket_object.function_archive.name
environment_variables = {
FITBIT_CREDENTIAL_BUCKET = google_storage_bucket.cloud_function_bucket.name
FITBIT_CREDENTIAL_OBJECT = google_storage_bucket_object.fitbit_credential.name
FITBIT_CLIENT_ID = var.fitbit_client_id
FITBIT_CLIENT_SECRET = var.fitbit_client_secret
BIGQUERY_PROJECT_ID = var.project_id
BIGQUERY_DARASET_ID = var.bigquery_dataset_id
BIGQUERY_TABLE_ID = var.bigquery_table_id
}
event_trigger {
event_type = "google.pubsub.topic.publish"
resource = google_pubsub_topic.trigger_topic.id
}
}
resource "google_bigquery_table" "warehouse_health_sleep" {
dataset_id = google_bigquery_dataset.fitbit_analytics_dataset.dataset_id
table_id = var.bigquery_table_id
deletion_protection = false
}
resource "google_bigquery_job" "load_csv" {
job_id = "load_csv_job"
load {
destination_table {
project_id = google_bigquery_table.warehouse_health_sleep.project
dataset_id = google_bigquery_table.warehouse_health_sleep.dataset_id
table_id = google_bigquery_table.warehouse_health_sleep.table_id
}
source_uris = ["gs://${google_storage_bucket.cloud_function_bucket.name}/${google_storage_bucket_object.csv.name}"]
source_format = "CSV"
skip_leading_rows = 1
allow_quoted_newlines = true
field_delimiter = ","
quote = "\""
autodetect = true
write_disposition = "WRITE_TRUNCATE"
}
location = var.region
}
def get_bq_fitbit_df():
project_id = os.environ.get("BIGQUERY_PROJECT_ID")
dataset_id = os.environ.get("BIGQUERY_DARASET_ID")
table_id = os.environ.get("BIGQUERY_TABLE_ID")
bigquery_client = bigquery.Client()
# SQL クエリを作成
query = f"""
SELECT *
FROM `{project_id}.{dataset_id}.{table_id}`
"""
query_job = bigquery_client.query(query)
fitbit_df = query_job.to_dataframe()
return fitbit_df
③Token,RefreshToken取得(Storage)
FitbitのAPIを叩くにはTokenが必要なのですが、Tokenの有効期限は8時間と短いのでRefreshTokenを使用してTokenとRefreshTokenを更新します。詳しい説明はこちらの記事を見るのが良いと思います。
今回はこのTokenはCloud Functionで更新するのでTokenの情報はGCSにアップして更新し続けるような設定にしておきました。
def updateToken(token):
storage_client = storage.Client()
bucket = storage_client.get_bucket(os.environ.get("FITBIT_CREDENTIAL_BUCKET"))
blob = bucket.get_blob(os.environ.get("FITBIT_CREDENTIAL_OBJECT"))
json_token = json.dumps(token)
bytes_token = json_token.encode()
# GCSのオブジェクトを更新
blob.upload_from_string(bytes_token)
④前日データ取得(Fitbit API)
FitbitのAPIから前日分のデータを取得するコードを書きます。何故当日ではなく、前日のデータを取得する必要があるでしょうか?
それは前日23:59までの消費カロリーなどのデータを完全な状態で取得したかったからです。
def build_days_metrics_dict(authed_client,dates_list, activity_metrics, sleep_metrics, sleep_levels):
days_result_dict = {}
for date in dates_list:
day_metrics = []
activity_metrics = activity_metrics
activity_response = authed_client.activities(date=date)
for activity_metrics_name in activity_metrics:
try:
day_metrics.append(activity_response['summary'][activity_metrics_name])
except:
day_metrics.append(0)
sleep_metrics = sleep_metrics
sleep_response = authed_client.sleep(date=date)
for sleep_metrics_name in sleep_metrics:
try:
day_metrics.append(sleep_response["sleep"][0][sleep_metrics_name])
except:
day_metrics.append(0)
for sleep_level in sleep_levels:
try:
day_metrics.append(sleep_response['summary']['stages'][sleep_level])
except:
day_metrics.append(0)
days_result_dict[date] = day_metrics
return days_result_dict
⑤データ更新(BigQuery)
③と④で取得した過去データと最新である前日データをconcatしてデータを更新していきます。
本来であれば、冪等性を意識しないといけませんが、まだ実施できておりません。
詳しいことは以下の記事がすごく参考になります。
def append_data_to_bigquery(request, context):
project_id = os.environ.get("BIGQUERY_PROJECT_ID")
dataset_id = os.environ.get("BIGQUERY_DARASET_ID")
table_id = os.environ.get("BIGQUERY_TABLE_ID")
dataset_table_id = f"{dataset_id}.{table_id}"
fitbit_client = fitbit.Fitbit(CLIENT_ID, CLIENT_SECRET,
access_token = access_token, refresh_token = refresh_token, refresh_cb = updateToken)
# 必要なメトリクス
activity_metrics = ['duration','efficiency','min','max','name','minutes','caloriesOut','distance','steps','lightlyActiveMinutes','veryActiveMinutes','sedentaryMinutes']
sleep_metrics = ['timeInBed','minutesAwake','minutesAsleep','restlessCount','restlessDuration','minutesToFallAsleep','startTime','endTime','awakeDuration','awakeningsCount','minuteData']
sleep_levels = ['deep', 'light', 'rem', 'wake']
dates_list = build_date_list()
days_result_dict = build_days_metrics_dict(fitbit_client, dates_list, activity_metrics, sleep_metrics, sleep_levels)
days_clumns_name = activity_metrics + sleep_metrics + sleep_levels
today_fitbit_df = convert_dict_to_dataframe(days_result_dict,days_clumns_name,'date')
previous_fitbit_df = get_bq_fitbit_df()
concat_dataframe = pd.concat([previous_fitbit_df, today_fitbit_df], ignore_index=False)
pandas_gbq.to_gbq(concat_dataframe, dataset_table_id, project_id, if_exists='replace')
return 'Data has been appended to the table.', 200
⑥可視化(Looker Studio)
ここからはデータ活用フェーズです。
わざわざBigQueryでクエリを叩かなくともBIツールであるLooker Studioで最新データを可視化できれば良さそうですよね。
⑦データ分析(Colabratory)
そして最後がGoogle Colabratoryでデータ分析です。
まああるあるなのですが、欠損値や外れ値が非常に多いわけです。これらを適宜見ながらIQR法やZスコア法を使用したりして分析していきます。
データを整備できたら深いノンレム睡眠時間(全体睡眠時間に対してのノンレム睡眠時間)を伸ばしていきたいので説明変数の中で何が効いてそうなのかを調べたりします。画像は入れてはいけない変数が入っているので良くない分析結果です。
個人的な予想は激しい運動が長い1日ほどよく眠れるので、消費カロリーが何かしら影響がありそうと思っていました。しかし、特に効いて無さそうなので何が影響するのか考え直しです。
まとめ
本ブログでは、Fitbitの健康データを活用し、データ分析基盤の構築方法とその利用方法について
このデータ分析基盤を活用することで、Fitbitデータをさらに有益に利用し、個人の健康管理や睡眠の質向上に役立てることができます。毎日自動でデータを更新してくれるようになったので非常に便利です。
今回の分析では、消費カロリーが睡眠に大きな影響を与えるとは限らないことがわかりました。
今後は、さらに多角的な観点からデータを分析し、健康管理に役立つ指標を見つけていきたいです。
最後に今回のリポジトリも置いておきます。