0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks にて BigQuery から JDBC により Spark データフレームを作成する方法

Last updated at Posted at 2025-04-30

概要

Databricks にて BigQuery から JDBC により Spark データフレームを作成する方法を共有します。本記事で紹介する方法を検証目的としており、多くのケースにおいてformatオプションをbigqueryで指定して Spark コネクターを利用する方が最適です。

本記事では Serverless クラスターで実行することを想定しておりません。

本記事は下記シリーズの一部です。

image.png

出所:Databricks で BigQuery のデータを取得する方法の徹底ガイド - Qiita

事前準備

BigQuery に対する認証情報を取得

下記の記事を参考に BigQuery に対する認証情報(キー)を取得してください。

BigQuery の JDBC の完全修飾クラス名の取得

/databricks/jars配下にある BigQuery の JDBC ファイルのパスを取得します。

%sh
ls -1 /databricks/jars/*GoogleBigQueryJDBC*.jar
/databricks/jars/----ws_3_5--third_party--bigquery-jdbc--bigquery-driver-shaded---846918551--GoogleBigQueryJDBC42.jar

image.png

BigQuery の JDBC の完全修飾クラス名(bigquery.shaded.com.simba.googlebigquery.jdbc42.Driver)を取得します。

%sh
jar tf /databricks/jars/----ws_3_5--third_party--bigquery-jdbc--bigquery-driver-shaded---846918551--GoogleBigQueryJDBC42.jar | grep jdbc42/Driver.class
import zipfile
import re

jar_path = "/databricks/jars/----ws_3_5--third_party--bigquery-jdbc--bigquery-driver-shaded---846918551--GoogleBigQueryJDBC42.jar"

with zipfile.ZipFile(jar_path) as zf:
    # namelist() で JAR 内のファイル一覧を取得
    for entry in zf.namelist():
        # jdbc42/Driver.class だけを対象にする
        if entry.endswith("jdbc42/Driver.class"):
            # / → . に置換し、末尾の .class を削除
            class_name = re.sub(r"\.class$", "", entry.replace("/", "."))
            print(class_name)
bigquery.shaded.com.simba.googlebigquery.jdbc42.Driver

image.png

jdbc42という指定値は変更される可能性があり、上記コードの値も対応させる必要があります。

手順

1. OAuthPvtKeyPathプロパティを指定する手順

1-1. 認証情報を変数にセット

json_str = """{json_key}"""

image.png

1-2. Spark データフレームを作成

# Google Cloud の プロジェクト ID をセット
project_id = "axial-triode-XXXXX"

# サービスアカウントのメールアドレスをセット
service_account_mail = "dbxfederation01@axial-triode-XXXXX.iam.gserviceaccount.com"

# 取得した driver の値をセット
driver = "bigquery.shaded.com.simba.googlebigquery.jdbc42.Driver"
# データ取得元のテーブル名を設定
table_name = "bigquery-public-data.google_analytics_sample.ga_sessions_20170801"

jdbc_url = (
    "jdbc:bigquery://https://www.googleapis.com/bigquery/v2;"
    f"ProjectId={project_id};"
    "OAuthType=0;"
    f"OAuthServiceAcctEmail={service_account_mail};"
    f"OAuthPvtKey={json_str};"
    "LogLevel=0;"
    "Timeout=600;"
)

bq_df = (
    spark.read.format("jdbc")
    .option("url", jdbc_url)
    .option("driver", driver)
    .option("dbtable", table_name)
    .load()
)

bq_df.limit(10).display()

image.png

image.png

Serverless compute for notebooks で実行した場合には下記のエラーが発生します。

The input query contains unsupported data source(s). Only csv, json, avro, delta, kafka, parquet, orc, text, unity_catalog, binaryFile, xml, simplescan, iceberg, mysql, postgresql, sqlserver, redshift, snowflake, sqldw, databricks, bigquery, oracle, salesforce, salesforce_data_cloud, teradata, workday_raas, mongodb data sources are supported on serverless compute, and only csv, json, avro, delta, kafka, parquet, orc, text, unity_catalog, binaryFile, xml, simplescan, iceberg data sources are allowed to run DML on serverless compute.

image.png

2. OAuthPvtKeyPathプロパティを指定する手順

2-1. Databricks Runtime のバージョンを確認

print(spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion"))

image.png

2-2. Volume 配下に認証情報(キー)を配置して、パスを取得

image.png

2-3. 認証情報のキーを FileStore のディレクトリに配置

# Volume に配置したキーのファイルパスを指定
src_key_path = "/Volumes/bq_work/default/bq-keys/axial-triode-XXXXX-1ba03e864e42.json"
local_key_path  = "/FileStore/bq/axial-triode-XXXXX-1ba03e864e42.json"

dbutils.fs.cp(src_key_path, local_key_path, True)

image.png

2-4. Spark データフレームを作成

# Google Cloud の プロジェクト ID をセット
project_id = "axial-triode-XXXXX"

# サービスアカウントのメールアドレスをセット
service_account_mail = "dbxfederation01@axial-triode-XXXXX.iam.gserviceaccount.com"

# 取得した driver の値をセット
driver = "bigquery.shaded.com.simba.googlebigquery.jdbc42.Driver"
# データ取得元のテーブル名を設定
table_name = "bigquery-public-data.google_analytics_sample.ga_sessions_20170801"

spark_key_path = "/dbfs" + local_key_path
jdbc_url = (
    "jdbc:bigquery://https://www.googleapis.com/bigquery/v2;"
    f"ProjectId={project_id};"
    "OAuthType=0;"
    f"OAuthServiceAcctEmail={service_account_mail};"
    f"OAuthPvtKeyPath={spark_key_path};"
    "LogLevel=0;"
    "Timeout=600;"
)

bq_df = (
    spark.read.format("jdbc")
    .option("url", jdbc_url)
    .option("driver", driver)
    .option("dbtable", table_name)
    .load()
)

bq_df.limit(10).display()

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?