今回試してみたこと
BigQueryStudioの標準機能である「PySparkプロシージャ」を利用して、BigQueryのデータをPySparkで操作してみました.
事前準備
PySpark利用にあたってのAPI有効化、接続作成、権限付与は以下の公式ドキュメントを参考に事前実施.
今回使うデータ
BigQueryの公開データセットである2016年のメジャーリーグの試合スケジュールデータを自身のprojectにコピーして利用.
↓今回使用しているデータセットへのリンク
なお、公開データセットを利用する場合は以下を参照
実際に試してみた
右上に「Apache Sparkの接続が必要です」という警告が出るため、以下画像のPySparkのオプション設定画面にて、事前に準備したSparkの接続を設定.
今回は2016年のメジャーリーグのスケジュールから「ドジャース」の「2016年08月」データのみを抽出して、新しいテーブルを作成する簡単な処理を実装.
PySparkコード
from pyspark.sql import SparkSession
# SparkSessionを生成
spark = SparkSession \
.builder \
.getOrCreate()
# 一時データを保存するバケットを指定
bucket = "pyspark-sample-20240720"
spark.conf.set('temporaryGcsBucket', bucket)
# BigQueryからデータをロードしてDataFrameで格納
schedules = spark.read.format('bigquery') \
.option('table', 'abstract-ring-404322.sample_data.schedules') \
.load()
# 2016年10月に開催されたドジャースの試合に絞る
dodgers_schedules_october = schedules \
.filter((schedules["homeTeamName"] == "Dodgers") \
& (schedules["startTime"] >= "2016-08-01 00:00:00") \
& (schedules["startTime"] < "2016-09-01 00:00:00") \
)
# BigQueryにロード
dodgers_schedules_october.write.format('bigquery') \
.option('table', 'abstract-ring-404322.sample_data.dodgers_schedules_october') \
.save()
ログエクスプローラーから以下のエラーログを確認.
データセットへのアクセス権がないということなので、コンソールのデータセット名横の3点リーダーから「共有>権限の管理」で今回Sparkの接続に利用しているサービスアカウントへの権限を付与.
再実行の結果、正常終了.
裏でDataprocのインスタンスが立ち上がってSparkの処理をしている都合上、簡単な処理であっても多少のオーバーヘッドを要することがわかった.
期待通り新規テーブルが作成され、内容も「ドジャース」の「2016年08月」データが抽出されたものとなっていた.
終わりに
BigQuery Studio上でPySparkを使用してデータ操作を実施してみた.
設定から実行まで非常に簡単にできるため、PySparkでBigQuery上のデータを操作したい場合には非常に素晴らしい機能だと感じた.
またBigQuery上に保存したPySparkコードはストアドプロシージャとして呼び出すことが可能なため、作成した処理をスケジュール実行することも可能.
今回は簡単な処理の実行をしただけであったが、PySparkを利用することによってクエリでは実行が難しい複雑な条件分岐などの実装をすることができるため、BigQueryで複雑な処理を実施したいケースなどでは特に有効かと思うのでぜひ.