背景
BigQueryデータをpythonコードで使う方法はいくつかあるが、中でも最近登場したBigQuery Storage APIは読み込み速度の点で非常に優れているらしい。
公式ページ:https://cloud.google.com/bigquery/docs/bigquery-storage-python-pandas?hl=ja
まだ新しめのAPIで、ちょっと変則的なことをしようとするとなかなかサンプルコードがなかったので残しておく。実行速度などの検証は他記事に任せることとする。
やりたいこと
BigQueryのデータをPythonで直接読み込みPandasに格納する。今回は件数が多いデータを処理したかったので条件をつけて一部データのみ取り出す。
前準備
まずは使用するライブラリをpipインストール。
conda createなどで新たに別の仮想環境を用意するのがおすすめ。(私は普通にいつも使う環境に入れてしまったが、ライブラリ同士のバージョン依存関係が壊れてしまったのかエラーを取るのが個人的にめっちゃ面倒だった)
pip install --upgrade google-cloud-bigquery[bqstorage,pandas]
実装
まずは認証情報を設定してクライアントを作成。
import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1
credentials, your_project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
bqclient = bigquery.Client(
credentials=credentials,
project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=credentials
)
いよいよデータを読み込む。プロジェクトIDなどは適宜書き換えてください。
print('Loading data...')
table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "PROJECT_ID"//読み込みたいBQのあるプロジェクトID
table.dataset_id = "DATASET_ID"//読み込みたいデータセットID
table.table_id = "TASBLE_ID"//読み込みたいテーブルID
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
#読み込みたいカラム名
cols = ['col1', 'col2', 'col3']
for col in cols:
read_options.selected_fields.append(col)
#例えば「userid = "hogehoge"」という条件でとってくるデータを絞り込みたい場合
read_options.row_restriction = 'userid = "hogehoge"'
parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
table,
parent,
read_options=read_options,
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
sharding_strategy=(
bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID
),
)
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)
dataframe = reader.to_dataframe(session)
#datafremeの中に読み込んだデータが入っているので、あとはよしなに。