0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

BigQueryStorageAPIを使ってBigQueryからPandas DataFrameに高速変換

Last updated at Posted at 2022-03-01

BigQuery上のデータを、自然言語処理等のケースにおいてPandasのDataFrameに変換した方が便利なこともある。対象データが大きくなるとロード時間が大きくなるために高速化として、BigQueryStorageAPIを使うのがイケてるらしいが最新の日本語記事があまり見当たらなかったので記事作成。

要約

    gcs_service_account = {
      "type": "service_account",
      "project_id": "project-291031",
      "private_key_id": "464564c7f86786afsa453345dsf234vr32",
      "private_key": "-----BEGIN PRIVATE KEY-----\ndD\n-----END PRIVATE KEY-----\n",
      "client_email": "my-email-address@project-291031.iam.gserviceaccount.com",
      "client_id": "543423423542344334",
      "auth_uri": "https://accounts.google.com/o/oauth2/auth",
      "token_uri": "https://oauth2.googleapis.com/token",
      "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
      "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/d453/my-email-address@project-291031.iam.gserviceaccount.com"
    }

    credentials = service_account.Credentials.from_service_account_info(gcs_service_account)
    bqstorage_client = BigQueryReadClient(credentials=credentials)
    
    gcp_project_id = 'gcp-project-id'
    project_id = "bigquery-public-data"
    dataset_id = "new_york_trees"
    table_id = "tree_species"
    table = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"

    read_options = types.ReadSession.TableReadOptions(
        selected_fields=["species_common_name", "fall_color"]
    )

    parent = "projects/{}".format(gcp_project_id)

    requested_session = types.ReadSession(
        table=table,
        data_format=types.DataFormat.ARROW,
        read_options=read_options,
    )

    read_session = bqstorage_client.create_read_session(parent=parent, read_session=requested_session, max_stream_count=1)
    stream = read_session.streams[0]
    reader = bqstorage_client.read_rows(stream.name)

    frames = []
    for message in reader.rows().pages:
        frames.append(message.to_arrow())
    pa_table = pa.Table.from_batches(frames)
    dataframe = pa_table.to_pandas()

過程

BigQuery Storage APIの速度比較をやってみるの先人の知恵で、BigQueryクライアントライブラリにBigQuery Storage APIを噛ませるとめちゃくちゃ高速化するらしい。

from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

query = """
SELECT * FROM `bigquery-public-data.wikipedia.pageviews_2019` 
where datehour = '2019-01-01'
"""

bq_client = bigquery.Client(project=<project_id>)
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient()
df_temp = bq_client.query(query).to_dataframe(bqstorage_client)

BigQuery Storage APIを使ってみたの他の先人のベンチーマークを見ると、標準クライアントライブラリとpandasのread_gbqの10倍早いらしい。

公式のドキュメントを読むと、google-cloud-bigquery-storageクライアントライブラリが2.0に移行した様なので、1.現在の形に書き換える 2.公式だとまだ遅い気がする 3.写経しても動かない ので修正。

    your_project_id = "project-for-read-session"
    from google.cloud import bigquery_storage
    from google.cloud.bigquery_storage import types
    import pandas

    bqstorageclient = bigquery_storage.BigQueryReadClient()

    project_id = "bigquery-public-data"
    dataset_id = "new_york_trees"
    table_id = "tree_species"
    table = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"

    # Select columns to read with read options. If no read options are
    # specified, the whole table is read.
    read_options = types.ReadSession.TableReadOptions(
        selected_fields=["species_common_name", "fall_color"]
    )

    parent = "projects/{}".format(your_project_id)

    requested_session = types.ReadSession(
        table=table,
        # Avro is also supported, but the Arrow data format is optimized to
        # work well with column-oriented data structures such as pandas
        # DataFrames.
        data_format=types.DataFormat.ARROW,
        read_options=read_options,
    )
    read_session = bqstorageclient.create_read_session(
        parent=parent, read_session=requested_session, max_stream_count=1,
    )

    # This example reads from only a single stream. Read from multiple streams
    # to fetch data faster. Note that the session may not contain any streams
    # if there are no rows to read.
    stream = read_session.streams[0]
    reader = bqstorageclient.read_rows(stream.name)

    # Parse all Arrow blocks and create a dataframe.
    frames = []
    for message in reader.rows().pages:
        frames.append(message.to_dataframe())
    dataframe = pandas.concat(frames)
    print(dataframe.head())

BigQueryReadClientに、GCPの「IAMと管理」の「サービスアカウント」からJSON情報を発行してかませる

    gcs_service_account = {
      "type": "service_account",
      "project_id": "project-291031",
      "private_key_id": "464564c7f86786afsa453345dsf234vr32",
      "private_key": "-----BEGIN PRIVATE KEY-----\ndD\n-----END PRIVATE KEY-----\n",
      "client_email": "my-email-address@project-291031.iam.gserviceaccount.com",
      "client_id": "543423423542344334",
      "auth_uri": "https://accounts.google.com/o/oauth2/auth",
      "token_uri": "https://oauth2.googleapis.com/token",
      "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
      "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/d453/my-email-address@project-291031.iam.gserviceaccount.com"
    }

    credentials = service_account.Credentials.from_service_account_info(gcs_service_account)
    bqstorage_client = BigQueryReadClient(credentials=credentials)

ちゃんとベンチマークしていないけど、明らかにループ処理の中で、message.to_dataframe()をしているのが高速化のボトルネックになってる気がしたので、messageの型を見るとを使っている。調べてみるとReadRowsPage.to_dataframe()以外に、高速で有名なReadRowsPage.to_arrow()のpyarrowがあるので採用する。

    # Parse all Arrow blocks and create a dataframe.
    frames = []
    for message in reader.rows().pages:
        frames.append(message.to_dataframe())
    dataframe = pandas.concat(frames)
    print(dataframe.head())

to_arrow()の返り値が、pyarrow.RecordBatchなので、from_batchesで変換した後に、to_pandas()で最後に一括変換してあげれば早くなった気がする。

    frames = []
    for message in reader.rows().pages:
        frames.append(message.to_arrow())
    pa_table = pa.Table.from_batches(frames)
    dataframe = pa_table.to_pandas(split_blocks=False, self_destruct=False, deduplicate_objects=True, use_threads=True)
    return dataframe, username

参考文献

・BigQuery Storage APIの速度比較をやってみる
・Python Client for Google BigQuery Document
・BigQueryReadClient
・How to convert to Pandas
・Apache Arrow Document
・pyarrow.Table
・pyarrow.RecordBatch
・Apache Arrowの凄さを体感する

0
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?