Help us understand the problem. What is going on with this article?

【BigQuery】pandasにBQデータの一部を高速で読み込み

背景

BigQueryデータをpythonコードで使う方法はいくつかあるが、中でも最近登場したBigQuery Storage APIは読み込み速度の点で非常に優れているらしい。
公式ページ:https://cloud.google.com/bigquery/docs/bigquery-storage-python-pandas?hl=ja

まだ新しめのAPIで、ちょっと変則的なことをしようとするとなかなかサンプルコードがなかったので残しておく。実行速度などの検証は他記事に任せることとする。

やりたいこと

BigQueryのデータをPythonで直接読み込みPandasに格納する。今回は件数が多いデータを処理したかったので条件をつけて一部データのみ取り出す。

前準備

まずは使用するライブラリをpipインストール。
conda createなどで新たに別の仮想環境を用意するのがおすすめ。(私は普通にいつも使う環境に入れてしまったが、ライブラリ同士のバージョン依存関係が壊れてしまったのかエラーを取るのが個人的にめっちゃ面倒だった)

pip install --upgrade google-cloud-bigquery[bqstorage,pandas]

実装

まずは認証情報を設定してクライアントを作成。

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)

いよいよデータを読み込む。プロジェクトIDなどは適宜書き換えてください。

        print('Loading data...')
        table = bigquery_storage_v1beta1.types.TableReference()
        table.project_id = "PROJECT_ID"//読み込みたいBQのあるプロジェクトID
        table.dataset_id = "DATASET_ID"//読み込みたいデータセットID
        table.table_id = "TASBLE_ID"//読み込みたいテーブルID

        read_options = bigquery_storage_v1beta1.types.TableReadOptions()
        #読み込みたいカラム名
        cols = ['col1', 'col2', 'col3']
        for col in cols:
            read_options.selected_fields.append(col)
        #例えば「userid = "hogehoge"」という条件でとってくるデータを絞り込みたい場合
        read_options.row_restriction = 'userid = "hogehoge"'
        parent = "projects/{}".format(your_project_id)
        session = bqstorageclient.create_read_session(
            table,
            parent,
            read_options=read_options,
            format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
            sharding_strategy=(
                bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID
            ),
        )
        stream = session.streams[0]
        position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
        reader = bqstorageclient.read_rows(position)

        dataframe = reader.to_dataframe(session)

        #datafremeの中に読み込んだデータが入っているので、あとはよしなに。
Gri_Gra
若手技術者です。仕事では主にデータ解析とかやっています。趣味で作るものは仕事に関係あったりなかったり。pythonや機械学習まわりのことを備忘録的に書き留めてこうと思います。 記事に修正点/改善点等あればコメントください。
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away