LoginSignup
3
3

More than 3 years have passed since last update.

【BigQuery】pandasにBQデータの一部を高速で読み込み

Posted at

背景

BigQueryデータをpythonコードで使う方法はいくつかあるが、中でも最近登場したBigQuery Storage APIは読み込み速度の点で非常に優れているらしい。
公式ページ:https://cloud.google.com/bigquery/docs/bigquery-storage-python-pandas?hl=ja

まだ新しめのAPIで、ちょっと変則的なことをしようとするとなかなかサンプルコードがなかったので残しておく。実行速度などの検証は他記事に任せることとする。

やりたいこと

BigQueryのデータをPythonで直接読み込みPandasに格納する。今回は件数が多いデータを処理したかったので条件をつけて一部データのみ取り出す。

前準備

まずは使用するライブラリをpipインストール。
conda createなどで新たに別の仮想環境を用意するのがおすすめ。(私は普通にいつも使う環境に入れてしまったが、ライブラリ同士のバージョン依存関係が壊れてしまったのかエラーを取るのが個人的にめっちゃ面倒だった)

pip install --upgrade google-cloud-bigquery[bqstorage,pandas]

実装

まずは認証情報を設定してクライアントを作成。

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)

いよいよデータを読み込む。プロジェクトIDなどは適宜書き換えてください。

        print('Loading data...')
        table = bigquery_storage_v1beta1.types.TableReference()
        table.project_id = "PROJECT_ID"//読み込みたいBQのあるプロジェクトID
        table.dataset_id = "DATASET_ID"//読み込みたいデータセットID
        table.table_id = "TASBLE_ID"//読み込みたいテーブルID

        read_options = bigquery_storage_v1beta1.types.TableReadOptions()
        #読み込みたいカラム名
        cols = ['col1', 'col2', 'col3']
        for col in cols:
            read_options.selected_fields.append(col)
        #例えば「userid = "hogehoge"」という条件でとってくるデータを絞り込みたい場合
        read_options.row_restriction = 'userid = "hogehoge"'
        parent = "projects/{}".format(your_project_id)
        session = bqstorageclient.create_read_session(
            table,
            parent,
            read_options=read_options,
            format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
            sharding_strategy=(
                bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID
            ),
        )
        stream = session.streams[0]
        position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
        reader = bqstorageclient.read_rows(position)

        dataframe = reader.to_dataframe(session)

        #datafremeの中に読み込んだデータが入っているので、あとはよしなに。
3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3