[!NOTE]
本記事は2024年11月時点での内容になります。最新情報については公式サイトなどを確認願います。
Databricks AzureでBigQueryのテーブルを取得する
GA4のデータをBigQueryにエクスポートできるようになり、BigQueryを使う敷居もかなり低くなってきました。ですが、会員情報など他データがBigQuery以外にあることも珍しくありません。
今回、DatabricksからBigQueryにアクセスしなければならなくなったため、その手順とコードをまとめておきます。
作業手順
- GA4テーブルがあるBigQueryのプロジェクトでサービスアカウントを作成
- サービスアカウントの鍵を作成
- 鍵をNotebookファイルに記述
- Pythonファイルを実行してビュー(temp_bigquery_ga4_tables)を作成
- SQLを実行
GCPでサービスアカウントを作成
サービスアカウントの説明や作成手順自体は他でも紹介されていますので、作成手順については今回は省略させていただきます。
なお、サービスアカウントに権限が付与できるので、
- BigQuery データオーナー
- BigQuery データ編集者
- BigQuery データ閲覧者
- BigQuery ジョブユーザー
あたりを付与しておけばある程度のことは可能ですが、権限が強すぎる可能性もありますので各自で調整してください。
サービスアカウント鍵を作成
- サービスアカウントを作成後、サービスアカウント一覧から作成したアカウントをクリック
- 上部タブの「鍵」をクリックし、「キーを追加」を選択後、「新しい鍵を作成」をクリック
- 「キーのタイプ」では「JSON」を選択し、作成
- 作成後、JSONファイルをダウンロード
作成された鍵(JSONファイル)は以下のような形式になっていればOKです。
{
"type": "service_account",
"project_id": "project_id",
"private_key_id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"private_key": "-----BEGIN PRIVATE KEY-----xxxxxxxxxxxxxxxxxxx\n-----END PRIVATE KEY-----\n",
"client_email": "databricks@project_id.iam.gserviceaccount.com",
"client_id": "1234567890",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/databricks%40project_id.iam.gserviceaccount.com",
"universe_domain": "googleapis.com"
}
以上で、BigQueryでの作業は終了です。
鍵をNotebookファイルに記述
ここからはDatabricks上での作業になります。
決して望ましい方法ではありませんが、今回はとりあえず実行するのが目的なので、DatabricksのNotebookでPythonコード上に先ほど作成したサービスアカウントの鍵を記述します。
main.ipynb
%python
# Copy this cell
%pip install google-cloud-bigquery google-auth
dbutils.library.restartPython()
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
from pyspark.sql import SparkSession
import json
# Service account key
service_account_info = {
"type": "service_account",
"project_id": "project_id",
"private_key_id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"private_key": "-----BEGIN PRIVATE KEY-----xxxxxxxxxxxxxxxxxxx\n-----END PRIVATE KEY-----\n",
"client_email": "databricks@project_id.iam.gserviceaccount.com",
"client_id": "1234567890",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/databricks%40project_id.iam.gserviceaccount.com",
"universe_domain": "googleapis.com"
}
credentials = service_account.Credentials.from_service_account_info(service_account_info)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)
from pyspark.sql import functions as F
# Write a query to retrieve the data from BigQuery
query = """
SELECT
* ,
FROM project_id.analytics_GA4PROPERTYID.events_20241101
;
"""
# Define the temporary view name that store the result of the query above.
view_name = "temp_bigquery_ga4_tables"
query_job = client.query(query)
results = query_job.result()
# Create a temporary SQL view
rows = [dict(row) for row in results]
df = pd.DataFrame(rows)
spark = SparkSession.builder.appName("BigQueryToDatabricks").getOrCreate()
# spark_df = spark.createDataFrame(df)
spark_df = spark_df.withColumn("event_datetime", F.to_utc_timestamp(F.from_unixtime(F.col("event_timestamp")/1000000,'yyyy-MM-dd HH:mm:ss'),'UTC'))
spark_df.createOrReplaceTempView(view_name)
コード自体は特に難しい事はありませんが、注意点として、時間の取り扱いです。BigQueryでは event_timestamp カラムをDATETIME型に変更する方法として、DATETIME(TIMESTAMP_MICROS(event_timestamp), 'Asia/Tokyo') という方法がありますが、DATETIME型にしてからDatabricksに取り込むと日時が狂ってしまいます。
そのため、spark_df = spark_df.withColumn("event_datetime", F.to_utc_timestamp(F.from_unixtime(F.col("event_timestamp")/1000000,'yyyy-MM-dd HH:mm:ss'),'UTC')) のようにevent_timestampを取り込んでDatabricks側で変換する必要があります。
Pythonファイルを実行してビューを作成
上記のPython Notebookを実行すると、temp_bigquery_ga4_tablesビューが作成されます。
SQLを実行
同じNotebook内でSQLを作成すると、BigQueryのGA4テーブルからデータを抽出できます。
%sql
SELECT *
FROM temp_bigquery_ga4_tables
ただし、DatabricksではBigQueryのSTRUCT型やARRAY型はサポートされていないので、event_params、user_property、itemsカラムあたりは抽出するのに工夫が必要です。
例:ecommerce.transaction_id
get_json_object(to_json(ecommerce), '$.transaction_id') AS transaction_id
