前書き
データ分析は今やビジネスにおいて不可欠な要素となっています。
そこで、GCPでデータ分析基盤を作成し、実際に身近なデータで分析をしてみました。
説明に入る前に、私のスキルセットを簡単に記載いたします。
【経験】
・Python(主にpandas)を使用したアンケート回答のデータ処理
・Python(tkinter)を使用した、デスクトップアプリの開発
・VBAを使用したデータ処理
・Bigqueryを使用した、データ分析基盤の構築(前処理は主にSQL活用)
・Google Apps Scriptを使用した、データ抽出・データ整形
【資格】
・Professional Data Engineer
概要
開発環境
・Python
・SQL
・bash
・GCP
概要
自分の健康度を、日にちごとに「1 非常に調子が悪い」~「5 非常に調子がいい」で5段階評価し、
気候や睡眠時間との関係性を調査してみました。
また、今回はGCPのサービスを多く使いたかったので、
簡単にSQL等で処理できる部分もあえてサービスを使って処理しています。
データ分析基盤の構築
早速、データ構成とGCP基盤構築手順の流れを紹介します。
データ構成
元データの設計は以下2ファイルに分けて作成しています。
・基本データ.csv
目覚ましアプリに睡眠時間が記録されているので、そちらを手入力しています。
日付 (date) | 睡眠時間(hours_of_sleep) | 健康度(health) |
---|---|---|
2024/4/1 | 8:09 | 3 |
2024/4/2 | 7:57 | 3 |
2024/4/3 | 8:00 | 3 |
・気象データ.csv
気象庁のホームページからデータをダウンロードしてきました。
日付 (date) | 降水量 (precipitation) |
---|---|
2024/4/1 | 0 |
2024/4/2 | 1.5 |
2024/4/3 | 86.5 |
データ基盤構築
GCPアーキテクチャは以下のようになります。
①②「基本データ.csv」と「気象データ.csv」をCloud SDKでコマンドを使って、
Cloud Storageにアップロードします。
③Cloud Functionsを使って、Cloud Storageに「気象データ.csv」がアップロードされるのは監視します。
※「気象データ.csv」が後にアップロードされるのでこちらを指定しています
④「気象データ.csv」がアップロードされたら、Compute EngineのVMインスタンスが起動し、
起動スクリプトにより、Cloud Storageの保存されているbashファイルとDataflowジョブが実行されます。
■Cloud Functions
import functions_framework
import os
import json
from googleapiclient import discovery
from google.oauth2 import service_account
# 認証情報を読み込む
credentials = service_account.Credentials.from_service_account_file('jsonファイルのパスを記載')
# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def hello_gcs(cloud_event):
data = cloud_event.data
try:
file_name = os.path.basename(name)
print(file_name)
target_files = ['気象データ.csv']
if file_name in target_files:
print(f"Executing script for file: {file_name}")
# Compute Engine APIを使用してVMを起動
compute = discovery.build('compute', 'v1', credentials=credentials)
project = 'プロジェクトID'
zone = 'ゾーン'
instance = 'インスタンス名'
compute.instances().start(project=project, zone=zone, instance=instance).execute()
else:
print(f"Ignored file: {file_name}")
except Exception as e:
print(f"Error processing file: {e}")
■bashファイル
#!/bin/bash
# 仮想環境をアクティベート
python3 -m venv myenv
source myenv/bin/activate
# Apache Beamをインストール
pip install apache-beam
# Google Cloud Platformの統合が必要な場合
pip install apache-beam[gcp]
# Google Cloud Platformにログイン
gcloud auth login
gcloud config set project プロジェクトID
gcloud auth application-default login
# サービスアカウントキーを取得
gsutil cp jsonパス
gcloud auth activate-service-account --key-file=service.json --quiet
# Pythonファイルをダウンロードして実行
gsutil cp gs:フォルダパス/GCS_to_Bigquery.py ./GCS_to_Bigquery.py
python3 GCS_to_Bigquery.py
# すべての処理が完了した後にVMを停止
gcloud compute instances stop インスタンス名 --zone=us-ゾーン
■Dataflowジョブ
⑤dateカラムを元に、「基本データ.csv」と「気象データ.csv」をマージして、
Bigqueryに書きこんでいます。
後から設計を変えたい時などに柔軟に対応できるようにすることと、
apache_beamを触ってみたかったので、手動でコードを書いてみました。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
import datetime
import os
# 環境変数にサービスアカウントキーのファイルパスを設定する
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "jsonファイル"
# パイプラインオプションの設定
options = PipelineOptions(
runner='DirectRunner',
project='プロジェクト名',
job_name='merge-csv-to-bigquery',
temp_location='tempパス指定',
region='リージョン',
save_main_session=True
)
# スキーマ定義
table_schema = {
'fields': [
{'name': 'date', 'type': 'DATE', 'mode': 'REQUIRED'},
{'name': 'hours_of_sleep', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'health', 'type': 'INTEGER', 'mode': 'NULLABLE'},
{'name': 'precipitation', 'type': 'FLOAT', 'mode': 'NULLABLE'}
]
}
def run_pipeline():
with beam.Pipeline(options=options) as p:
# 基本データと気象データの読み込み
basic_data = p | 'Read Basic Data' >> beam.io.ReadFromText('gs://ファルダ/基本データ.csv', skip_header_lines=1)
weather_data = p | 'Read Weather Data' >> beam.io.ReadFromText('gs://フォルダ/気象データ.csv', skip_header_lines=1)
# CSV 形式からデータを解析し、キーと値のペアを作成
def parse_basic(line):
columns = line.split(',')
date_formatted = datetime.datetime.strptime(columns[0], '%Y/%m/%d').strftime('%Y-%m-%d')
return date_formatted, { 'hours_of_sleep': columns[1],'health': int(columns[2]) }
def parse_weather(line):
columns = line.split(',')
date_formatted = datetime.datetime.strptime(columns[0], '%Y/%m/%d').strftime('%Y-%m-%d')
return date_formatted, {'precipitation': float(columns[1])}
parsed_basic_data = basic_data | 'Parse Basic Data' >> beam.Map(parse_basic)
parsed_weather_data = weather_data | 'Parse Weather Data' >> beam.Map(parse_weather)
# データのマージ
merged_data = ({'basic_data': parsed_basic_data, 'weather_data': parsed_weather_data}
| 'Merge by Date' >> beam.CoGroupByKey()
| 'Format Data' >> beam.Map(lambda elements: {
'date': elements[0],
'hours_of_sleep': elements[1]['basic_data'][0]['hours_of_sleep'] if elements[1]['basic_data'] else None,
'health': elements[1]['basic_data'][0]['health'] if elements[1]['basic_data'] else None,
'precipitation': elements[1]['weather_data'][0]['precipitation'] if elements[1]['weather_data'] else None,
}))
merged_data | "Log Merged Data" >> beam.Map(print)
# BigQuery にデータを書き込む
merged_data | 'Write to BigQuery' >> WriteToBigQuery(
table='テーブル名',
schema=table_schema,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_TRUNCATE
)
if __name__ == '__main__':
run_pipeline()
これでBigqueryにデータを取り込めたので、
データ分析基盤の完成です。
データ分析
データ分析基盤が作成できたので、Looker Studioを使ってデータ分析をしていきたいと思います。
今回は以下の2について調べていきたいと思います。
・降水量が多い日は睡眠の質が上がり、健康度が上がるのではないか。
・睡眠時間と健康度の累積平均を元に、時系列でパフォーマンス変化を確認する。
降水量が多い日は睡眠の質が上がり、健康度が上がるのではないか。
こちらの仮説を検証するために、まずはSQLでデータ整形をしていきます。
健康度(health)は値が小さいので、比較しやすくするために降水量の値に近づけています。
SELECT
date,
CASE health
WHEN 1 THEN 20
WHEN 2 THEN 40
WHEN 3 THEN 60
WHEN 4 THEN 80
WHEN 5 THEN 100
ELSE health
END AS scaled_health,
precipitation
FROM
`テーブル名.health`
ORDER BY
date;
実際にグラフ化したデータが以下になります。
降水量が多い日は健康度が上がるという仮説を立てたのですが、
グラフを確認する限りは、全く関係なさそうです。
雨音には安眠効果があると聞いたことがあったのですが、
自分には関係なさそうでした。
睡眠時間と健康度の累積平均を元に、時系列でパフォーマンス変化を確認する。
こちらを調査するために、SQLでデータ整形をしていきます。
SELECT
date,
health,
CAST(SPLIT(hours_of_sleep, ':')[SAFE_OFFSET(0)] AS FLOAT64) +
CAST(SPLIT(hours_of_sleep, ':')[SAFE_OFFSET(1)] AS FLOAT64) / 60 AS hours_of_sleep_numeric,
AVG(CAST(SPLIT(hours_of_sleep, ':')[SAFE_OFFSET(0)] AS FLOAT64) +
CAST(SPLIT(hours_of_sleep, ':')[SAFE_OFFSET(1)] AS FLOAT64) / 60) OVER (ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_avg_sleep,
AVG(health) OVER (ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_avg_health
FROM
`テーブル.health`
ORDER BY
date;
実際にグラフ化したデータが以下になります。
睡眠時間・健康度のグラフの形が似ているので、2つの要素は連動しているようにも見えます。
また、睡眠時間・健康度の数字が共に下がってきていることがわかるので、
生活していく上で、睡眠時間について意識していこうと思います。
最後に
ここまでご覧いただきありがとうございました。
以上がGCPでデータ分析基盤を作成し、実際に身近なデータで分析をするまでの流れでした。
GCPを使えば、柔軟でスケーラビリティの高い分析基盤を作れるようになるので学習を続けていき、
ビジネスにおけるデータに基づいた意思決定やAIを支えていきたいと思います