はじめに
GoogleFitのアプリで記録している歩数や移動距離といったアクテビティデータをPythonで取得するスクリプトです。
実装
環境は以下の通り。
- Python: 3.8
- google-api-python-client: 2.45.0
- oauth2client: 4.1.3
前準備
はじめにGoogle Cloud ConsoleでFitness APIを有効化し、Oauth2クライアントの認証情報を作成する。
GCPにログインをし「Fitness API」の管理コンソールに移動する。その後、遷移した先の管理画面で「有効化」を押下し、以下のように「APIが有効です」と表示されることを確認する。
有効化が完了したらAPIの認証情報の作成にすすむ。「管理」を押下し、管理画面より「認証情報」タブに遷移する。
画面中の「認証情報を作成」選択し、認証方式で「OAuthクライアントID」を選択する。
その後、「アプリケーションの種類」にディスクトップアプリを選択し、名前に任意の名称を入力する。その後「作成」を押下する。
以下のように管理画面に遷移しポップアップが表示されるので、「Jsonをダウンロード」より、Jsonを形式の認証情報を取得しておく。
jsonのダウンロードが完了したら、ファイルの名称をsecret.json
に変更する。こちらでAPIの有効化と接続に利用する認証情報を取得でき、GoogleFitAPIを利用するための前準備が完了した。
必要ライブラリのインストール
はじめに、以下のコマンドで必要ライブラリをインストールする。
pip install google-api-python-client oauth2client
実行
先ほどGCPからダウンロードしたJsonを./key/secret.json
に配置した後、以下のソースコードを作成し実行する。
以下のプログラムでは、まず--credential_path=
で指定したパスにトーク情報が保存されているか確認する。そこにトークン情報がなければ、前準備の手順でダウンロードしたsecret.json
を利用したOAuth2認証を行いトークンを作成する。
その後、トークンを利用し、FitnessApiから今日の歩数や、移動距離と言ったアクテビティ情報を取得するというものである。
import argparse
import os
import time
from datetime import datetime, timedelta
import httplib2
from apiclient.discovery import build
from oauth2client.client import flow_from_clientsecrets
from oauth2client.file import Storage
OAUTH_SCOPE = [
"https://www.googleapis.com/auth/fitness.activity.read",
"https://www.googleapis.com/auth/fitness.body.read",
"https://www.googleapis.com/auth/fitness.location.read",
"https://www.googleapis.com/auth/fitness.nutrition.read",
]
def oauth2(credential_path: str):
"""取得した認証情報を用いて、GoogleFitAPIのアクセスに必要なトークンを発行する。
Args:
credential_path (str): 作成したトークンを保存するパス。
"""
flow = flow_from_clientsecrets(
# API有効化時に取得したOAuth用のJSONファイルを指定
"./key/secret.json",
# スコープを指定
scope=OAUTH_SCOPE,
# ユーザーの認証後の、トークン受け取り方法を指定
redirect_uri="urn:ietf:wg:oauth:2.0:oob",
)
authorize_url = flow.step1_get_authorize_url()
print("下記URLをブラウザで起動してください。")
print(authorize_url)
code = input("Codeを入力してください: ").strip()
credentials = flow.step2_exchange(code)
if not os.path.exists(credential_path):
Storage(credential_path).put(credentials)
def create_apiclient(credential_path: str) -> any:
"""tokenファイルを読み込みapiclientを作成する。
Args:
credential_path (str): tokenファイルのパス。
Returns:
googleapiclient.discovery.Resource: GoogleFitAPIにリクエストするクライアント。
"""
if os.path.exists(credential_path):
credentials = Storage(credential_path).get()
else:
print("tokenが存在しません。")
http = httplib2.Http()
http = credentials.authorize(http)
credentials.refresh(http) # 認証トークンを更新する。
apiclient = build("fitness", "v1", http=http)
return apiclient
def get_google_fitness_info(
apiclient: any, start_time: datetime, end_time: datetime
) -> dict:
"""クエリをGoogleFitAPIにリクエストして1日毎のデータを取得する。
Args:
apiclient (any): 認証済みのAPIクライアント
start_time (datetime): データ取得の開始日
end_time (datetime): データ取得の終了日
Returns:
dict: 取得結果。
"""
# 日付をミリ秒に変換する。
start_unix_time_millis: int = int(time.mktime(start_time.timetuple()) * 1000)
end_unix_time_millis: int = int(time.mktime(end_time.timetuple()) * 1000)
request_body = {
"aggregateBy": [
{
"dataTypeName": "com.google.distance.delta", # 移動距離
},
{
"dataTypeName": "com.google.step_count.delta", # 歩数
},
{
"dataTypeName": "com.google.calories.expended", # 消費カロリー
},
{
"dataTypeName": "com.google.heart_minutes", # 強めの運動
},
],
"bucketByTime": { # データを集約する単位。この例の場合は1日
"durationMillis": end_unix_time_millis - start_unix_time_millis
},
"startTimeMillis": start_unix_time_millis,
"endTimeMillis": end_unix_time_millis,
}
return (
(
apiclient.users()
.dataset()
.aggregate(userId="me", body=request_body)
.execute()
)
.get("bucket")[0]
.get("dataset")
)
def main(credential_path: str, target_date_before: int = 1) -> None:
"""GoogleFitからデータを受信しGooglePubSubにデータを送信する
Args:
credential_path (str): _description_
publisher (_type_): _description_
"""
if not os.path.exists(credential_path): # クレデンシャルが存在しない場合認証を行う。
print("クレデンシャルファイルが存在しないので、OAuth2認証を行います。")
oauth2(credential_path)
apiclient = create_apiclient(credential_path=credential_path)
# 日付を設定
yesterday: datetime = datetime.today() - timedelta(days=target_date_before)
start_time: datetime = datetime(
yesterday.year, yesterday.month, yesterday.day, 0, 0, 0
)
end_time: datetime = datetime(
yesterday.year, yesterday.month, yesterday.day, 23, 59, 59
)
dataset = get_google_fitness_info(apiclient, start_time, end_time)
print(f"今日の移動距離: {dataset[0].get('point')[0].get('value')[0].get('fpVal')} m")
print(f"今日の歩数: {dataset[1].get('point')[0].get('value')[0].get('intVal')} 歩")
print(f"今日の消費カロリー: {dataset[2].get('point')[0].get('value')[0].get('fpVal')} kcal")
print(f"今日の強めの運動: {dataset[3].get('point')[0].get('value')[0].get('fpVal')} point")
if __name__ == "__main__":
# 引数の設定
parser = argparse.ArgumentParser()
parser.add_argument(
"--credential_path",
help="Googleのクレデンシャルキーのパス",
default="./key/credentials",
)
args = parser.parse_args()
main(args.credential_path, target_date_before=1)
上記のプログラムの実行結果が以下となる。歩数以外にも、移動距離や消費カロリー、強めの運動のポイントを取得できた。
$ python googlefit_getdata.py --credential_path=key/credentials
今日の移動距離: 3742.0896185040474 m
今日の歩数: 5086 歩
今日の消費カロリー: 1878.2676788921772 kcal
今日の強めの運動: 24 point
注意点
プログラムの途中で以下のように開始時刻と終了時刻をミリ秒に変換している。
start_unix_time_millis: int = int(time.mktime(start_time.timetuple()) * 1000)
end_unix_time_millis: int = int(time.mktime(end_time.timetuple()) * 1000)
これはFitnessAPIのAggregateリクエスト行うときに、集約対象の期間の指定をミリ秒で指定する必要があるための処理となっている。ちなみにミリ秒以外でリクエストを送信すると、エラーが出るわけでもなく空のレスポンスが返ってくるのみである。
ちなみに、FitenssAPIは全てミリ秒に変換して指定すれば良いというわけでなく、例えば集約せずにデータを取得するUser.dataSources.datasets
のリクエストでは、取得したいデータの時間の範囲指定するときに、ナノ秒で指定する必要がある。
FitnessAPIでは、リクエストボディの中身で同じような意味を持つキーでも、APIに応じてリクエストの際に指定するキーの名称やバリューのタイプが異なったりする。当然ではあるが利用するAPIに応じてしっかりドキュメントを確認しておきたい。