この記事は BrainPad Advent Calendar2021 21日目の記事となります。
今回は、2年前にBigQueryのテーブルの作成方法を記載したのでより利用頻度の高いデータ操作やBQMLのモデル作成方法についてまとめておこうと思います。
BigQueryをAPI経由で使う中で「あれ?どうやって書くんだっけ?」と公式リファレンスを確認しに行く頻度が高かったものを中心に整理しておきました。
初期設定
まずは、BigQueryをPythonのAPIで使うための初期設定を書いておきます。ライブラリの読み込みやBigQueryのテーブル名などを定義しておき後々これらを使ってクエリ結果などを参照します。
GCPの同一プロジェクト内で諸々の処理を実施することを想定しているため、クレデンシャルを明示的に渡すような処理はここでは行いません。
from google.cloud import bigquery as bq
client = bq.Client()
# プロジェクト名
project_name = client.project
# データセット名
# dataset_name = "demo"
# テーブル名
# table_name = "demo_table"
データを射影(SELECT)
もっとも基本的な処理である射影のやり方をいくつか書いておきます
結果を1件づつ取り出すパターン
table_id = f"{project_name}.{dataset_name}.{table_name}"
# 射影するクエリ文を作成
query = f"SELECT * FROM {table_id} LIMIT 10;"
query_job = client.query(query=query)
for row in query_job:
print(row)
pd.DataFrameの形で結果を受け取るパターン
大規模なデータを取り出す際にいきなりデータフレームで受け取るのはやや危険ですが、まぁよくやる処理ではありますね
query_job = client.query(query=query)
# to_dataframe()のメソッドを使って取り出す
df = query_job.to_dataframe()
df.head()
処理バイト数を見積もるためdry_runを事前に実行しておくパターン
BigQueryは処理1TBに対してだいたい500円くらいの課金額(2021年現在)なので、大規模データを取り扱っていない限りそこまでナーバスになる必要はないですが、事前に処理量を見積もりたい場合にはdry_runを実行しておきましょう
# dry_runの設定をTrueにしておく
# キャッシュ利用をTrueにしておくと結果が更新されないのでFalseにしておく
dryrun_config = bq.QueryJobConfig(dry_run=True, use_query_cache=False)
# dryrunで実行
query_job = client.query(query=query, job_config=dryrun_config)
total_mb = query_job.total_bytes_processed / (1024**2)
# おおよその処理MB数を確認
print(f"This query will process {int(total_mb)} MB.")
クエリからテーブルを作成(CREATE TABLE AS)
前回の記事でも書きましたが、念のためもう一度記載しておきます。
クエリに直接CREATE TABLEを書くパターン
元々存在するテーブルを加工した結果をテーブル化したりバックアップとして取っておきたい場合などでよく利用します
# 書き込み先テーブル名
new_table_name = "new_demo"
orig_table_id = f"{project_name}.{dataset_name}.{table_name}"
new_table_id = f"{project_name}.{dataset_name}.{new_table_name}"
query = f"""
DROP TABLE IF EXISTS {new_table_id} ;
CREATE TABLE {new_table_id} AS
SELECT * FROM {orig_table_id}
;"""
query_job = client.query(query=query)
# クエリの実行
query_job.result()
単純にテーブルをコピーするパターン
データ加工などは不要で単純に別名でテーブルコピーしたい場合はこちら
new_table_name = "new_demo"
orig_table_id = f"{project_name}.{dataset_name}.{table_name}"
new_table_id = f"{project_name}.{dataset_name}.{new_table_name}"
job = client.copy_table(orig_table_id, new_table_id)
job.result()
BQMLを操作
BigQueryのいいところはBQMLも通常のクエリ処理と同じように記述できるところかと思います
ということで、BQMLを使ってARIMAモデルを作成する例を軸にモデル作成から予測算出までを一通り書いておきます。
BQMLでは時系列予測以外にも様々なモデルが用意されているので下記を参照ください
モデル作成
ARIMAモデルをBQMLを用いて作成
# ARIMAモデルを作成する上で必要なパラメータを定義
model_name = "demo_arima_model"
model_id = f"{project_name}.{dataset_name}.{model_name}"
timestamp_col = "date"
target_col = "value"
id_col = "item"
holiday_region = "US"
# ARIMAモデル用クエリを記述
query_arima = f"""
CREATE OR REPLACE MODEL `{model_id}`
OPTIONS(
MODEL_TYPE='ARIMA',
TIME_SERIES_TIMESTAMP_COL='{timestamp_col}',
TIME_SERIES_DATA_COL='{target_col}',
TIME_SERIES_ID_COL='{id_col}',
HOLIDAY_REGION='{holiday_region}'
) AS
SELECT
*
FROM
`{table_id}`
;
"""
# ARIMAモデルを作成
query_job = client.query(query=query_arima)
query_job.result()
モデル評価
モデルの精度を確認するための処理は1行で書けるのでさらっとチェックしておきましょう
# 作成したARIMAモデルを指定
model_name = "demo_arima_model"
model_id = f"{project_name}.{dataset_name}.{model_name}"
# モデル評価用クエリ
query_eval = f"SELECT * FROM ML.EVALUATE(MODEL `{model_id}`) ;"
query_job = client.query(query=query_eval)
# DataFrameとして取得
query_job.to_dataframe()
予測算出
ARIMAモデルの予測値を算出します
予測期間と信頼区間を指定可能
# 予測期間を指定
horizon_num = 30
# 信頼区間を指定
confidence_level = 0.9
# 予測用クエリ
query_predict = f"""
SELECT
*
FROM
ML.FORECAST(
MODEL `{model_id}`, STRUCT({horizon_num} AS horizon, {confidence_level} AS confidence_level)
)
;"""
query_job = client.query(query=query_predict)
# DataFrameとして取得
query_job.to_dataframe()
小ネタ
そんなに利用頻度は高くないものの、検索してもパッと出てこない処理を小ネタとしてまとめておきます
1. スキーマの情報をテーブルから取り出す
テーブルオブジェクトの属性にスキーマ情報が入っているので、同じテーブル構造の空テーブルを作りたい時はスキーマ情報だけ抜き取ってくる
# スキーマ情報を取り出したいテーブルを指定
table_id = f"{project_name}.{dataset_name}.{table_name}"
table = client.get_table(table_id)
table.schema
2. 取り出した結果を指定の件数ごとページ分割する
クエリ結果を取り出したはいいものの全件一気にチェックするのは気がひける時に、ページ分割処理を行います
table_id = f"{project_name}.{dataset_name}.{table_name}"
query = f"SELECT * FROM {table_id} LIMIT 100;"
query_job = client.query(query=query)
query_job.result()
# クエリ結果をテーブル扱いする
destination = query_job.destination
destination = client.get_table(destination)
# テーブル扱いしている処理結果から20件だけ取り出す
rows = client.list_rows(destination, max_results=20)
for row in rows:
print(row)
3. テーブル全体をデータフレームとして受け取る
データ量が多いテーブルにやると死ぬ気がしますが、できるといえばできます
table_id = f"{project_name}.{dataset_name}.{table_name}"
df = client.list_rows(table_id).to_dataframe()
4. 取得結果の合計件数を取得する
クエリした結果の合計件数はすぐ確認可能
table_id = f"{project_name}.{dataset_name}.{table_name}"
query = f"SELECT * FROM {table_id} LIMIT 100;"
query_job = client.query(query=query)
results = query_job.result()
print(f"total: {results.total_rows}")
5. 複数テーブルを1つのテーブルに格納する
日付ごとに分かれているテーブルを1つにまとめたい時などに使う処理
スキーマ構成が違うとエラーになるので注意
table_a_id = f"{project_name}.{dataset_name}.{table_a_name}"
table_b_id = f"{project_name}.{dataset_name}.{table_b_name}"
table_ids = [table_a_id, table_b_id]
new_table_id = f"{project_name}.{dataset_name}.{new_table_name}"
job = client.copy_table(table_ids, new_table_id)
job.result()
参考
上記の処理は大体公式のリファレンスに記載がありますので、困ったら公式リファレンスを見てみましょう(戒め)
BigQueryのサンプル
おしまい