概要
Databricks にて BigQuery から JDBC により Spark データフレームを作成する方法を共有します。本記事で紹介する方法を検証目的としており、多くのケースにおいてformat
オプションをbigquery
で指定して Spark コネクターを利用する方が最適です。
本記事では Serverless クラスターで実行することを想定しておりません。
本記事は下記シリーズの一部です。
出所:Databricks で BigQuery のデータを取得する方法の徹底ガイド - Qiita
事前準備
BigQuery に対する認証情報を取得
下記の記事を参考に BigQuery に対する認証情報(キー)を取得してください。
BigQuery の JDBC の完全修飾クラス名の取得
/databricks/jars
配下にある BigQuery の JDBC ファイルのパスを取得します。
%sh
ls -1 /databricks/jars/*GoogleBigQueryJDBC*.jar
/databricks/jars/----ws_3_5--third_party--bigquery-jdbc--bigquery-driver-shaded---846918551--GoogleBigQueryJDBC42.jar
BigQuery の JDBC の完全修飾クラス名(bigquery.shaded.com.simba.googlebigquery.jdbc42.Driver
)を取得します。
%sh
jar tf /databricks/jars/----ws_3_5--third_party--bigquery-jdbc--bigquery-driver-shaded---846918551--GoogleBigQueryJDBC42.jar | grep jdbc42/Driver.class
import zipfile
import re
jar_path = "/databricks/jars/----ws_3_5--third_party--bigquery-jdbc--bigquery-driver-shaded---846918551--GoogleBigQueryJDBC42.jar"
with zipfile.ZipFile(jar_path) as zf:
# namelist() で JAR 内のファイル一覧を取得
for entry in zf.namelist():
# jdbc42/Driver.class だけを対象にする
if entry.endswith("jdbc42/Driver.class"):
# / → . に置換し、末尾の .class を削除
class_name = re.sub(r"\.class$", "", entry.replace("/", "."))
print(class_name)
bigquery.shaded.com.simba.googlebigquery.jdbc42.Driver
jdbc42
という指定値は変更される可能性があり、上記コードの値も対応させる必要があります。
手順
1. OAuthPvtKeyPath
プロパティを指定する手順
1-1. 認証情報を変数にセット
json_str = """{json_key}"""
1-2. Spark データフレームを作成
# Google Cloud の プロジェクト ID をセット
project_id = "axial-triode-XXXXX"
# サービスアカウントのメールアドレスをセット
service_account_mail = "dbxfederation01@axial-triode-XXXXX.iam.gserviceaccount.com"
# 取得した driver の値をセット
driver = "bigquery.shaded.com.simba.googlebigquery.jdbc42.Driver"
# データ取得元のテーブル名を設定
table_name = "bigquery-public-data.google_analytics_sample.ga_sessions_20170801"
jdbc_url = (
"jdbc:bigquery://https://www.googleapis.com/bigquery/v2;"
f"ProjectId={project_id};"
"OAuthType=0;"
f"OAuthServiceAcctEmail={service_account_mail};"
f"OAuthPvtKey={json_str};"
"LogLevel=0;"
"Timeout=600;"
)
bq_df = (
spark.read.format("jdbc")
.option("url", jdbc_url)
.option("driver", driver)
.option("dbtable", table_name)
.load()
)
bq_df.limit(10).display()
Serverless compute for notebooks で実行した場合には下記のエラーが発生します。
The input query contains unsupported data source(s). Only csv, json, avro, delta, kafka, parquet, orc, text, unity_catalog, binaryFile, xml, simplescan, iceberg, mysql, postgresql, sqlserver, redshift, snowflake, sqldw, databricks, bigquery, oracle, salesforce, salesforce_data_cloud, teradata, workday_raas, mongodb data sources are supported on serverless compute, and only csv, json, avro, delta, kafka, parquet, orc, text, unity_catalog, binaryFile, xml, simplescan, iceberg data sources are allowed to run DML on serverless compute.
2. OAuthPvtKeyPath
プロパティを指定する手順
2-1. Databricks Runtime のバージョンを確認
print(spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion"))
2-2. Volume 配下に認証情報(キー)を配置して、パスを取得
2-3. 認証情報のキーを FileStore のディレクトリに配置
# Volume に配置したキーのファイルパスを指定
src_key_path = "/Volumes/bq_work/default/bq-keys/axial-triode-XXXXX-1ba03e864e42.json"
local_key_path = "/FileStore/bq/axial-triode-XXXXX-1ba03e864e42.json"
dbutils.fs.cp(src_key_path, local_key_path, True)
2-4. Spark データフレームを作成
# Google Cloud の プロジェクト ID をセット
project_id = "axial-triode-XXXXX"
# サービスアカウントのメールアドレスをセット
service_account_mail = "dbxfederation01@axial-triode-XXXXX.iam.gserviceaccount.com"
# 取得した driver の値をセット
driver = "bigquery.shaded.com.simba.googlebigquery.jdbc42.Driver"
# データ取得元のテーブル名を設定
table_name = "bigquery-public-data.google_analytics_sample.ga_sessions_20170801"
spark_key_path = "/dbfs" + local_key_path
jdbc_url = (
"jdbc:bigquery://https://www.googleapis.com/bigquery/v2;"
f"ProjectId={project_id};"
"OAuthType=0;"
f"OAuthServiceAcctEmail={service_account_mail};"
f"OAuthPvtKeyPath={spark_key_path};"
"LogLevel=0;"
"Timeout=600;"
)
bq_df = (
spark.read.format("jdbc")
.option("url", jdbc_url)
.option("driver", driver)
.option("dbtable", table_name)
.load()
)
bq_df.limit(10).display()