導入
ある時、BigQueryを使っている現場で比較的大きなデータを1ファイルにまとめて出力しないといけない場面に遭遇したのですが、BigQueryのデータエクスポートに関するドキュメントには「BigQueryは最大1GBまでのデータなら1ファイルに出力できる」と記載があるだけで、1GBを超えるデータを1ファイルにまとめる方法については特に記載がありませんでした。
その後、ネット記事を漁ってみたところstack overflowでようやくそれらしい回答を見つけたのですが、リンクが飛散していて読みにくかったことと日本語での解説がなかったことから、せっかくなので色々な手段の比較とベストアンサーの考察をこの記事で書いてみることにしました。
Googleの製品はアップデートが早いのでこの記事のベストアンサーも今後変わっていく可能性がありますが、同じような課題に直面している方の参考になれば幸いです。
準備
BQの準備
BigQueryに関しては検証用のプロジェクトを作成して下記のリンクから一般公開データセットのgithub_timelineテーブルを利用することにしました。
GCSの準備
BigQueryは単体だとリージョンをまたぐクエリの実行ができません。そこで、USリージョンにある一般公開データセットを利用する関係から、GCSは次の画像のようなUSリージョン向けの設定でバケットを作成しました。
検証
ではいよいよ検証を始めていきましょう!
EXPORT DATA
まずはシンプルにEXPORT DATA文でクエリ結果を直接GCSに書き出す方法を試してみます。
以下は一般公開データセットから適当にギリギリ1GBを超えるデータをSELECTしてGCSにエクスポートしてみた結果です。
EXPORT DATA OPTIONS (
uri = 'gs://bq-export-data-test/export/github_timeline*.csv.gz',
overwrite = true,
format = 'CSV',
field_delimiter = ',',
compression = 'GZIP',
header = true
) AS
SELECT repository_url,
repository_created_at,
repository_description,
repository_homepage,
repository_size,
repository_name,
repository_owner,
repository_pushed_at,
repository_language,
repository_organization,
repository_watchers,
repository_master_branch
FROM `bigquery-public-data.samples.github_timeline`
;
およそ3分ちょいで1.1GBのデータをファイル出力できたことは素晴らしいですが、バッチ処理や納品物作成などのケースを考えるとファイルが複数に分かれるのは少し厄介ですね...
考察①
BigQuery StudioなどでSQLを直接実行する場合、データのエクスポート先の"uri"オプションにはワイルドカードの「*」を含める必要があります。そして、BigQueryはこのワイルドカードを見つけると基本的に複数のワーカーを発行し、集計処理を並列分散することで最適化を図ります。(参考:クエリプランとタイムライン)そのため、EXPORT DATAを使う場合は、例え1GB以下のデータであっても、原則としてファイルが複数に分割されると考えておいた方がいいでしょう。
LIMIT
EXPORT DATAは原則としてデータを分割して出力すると話しましたが、原則には例外がつきもので、すべてのデータを1つのワーカーに持たせる手段があります。それは、以下のようにSELECTの結果にLIMITをかけるという方法です。
EXPORT DATA OPTIONS (
uri = 'gs://bq-export-data-test/limit/github_timeline*.csv.gz',
overwrite = true,
format = 'CSV',
field_delimiter = ',',
compression = 'GZIP',
header = true
) AS
SELECT repository_url,
repository_created_at,
repository_description,
repository_homepage,
repository_size,
repository_name,
repository_owner,
repository_pushed_at,
repository_language,
repository_organization,
repository_watchers,
repository_master_branch
FROM `bigquery-public-data.samples.github_timeline`
LIMIT 6219749 -- github_timelineのレコード数を参照
;
LIMITを付けただけなのにちゃんとファイルが1つにまとまっています!
考察②
結論として、私が現場で採用した手法はこのLIMITをかける方法でした。
理由は、EXPORT DATAの構文をそのまま使用でき、Pythonなどを介さずSQLだけで完結するためワークロードが複雑化するのを避けられるからです。
ただし、この方法は万能ではなく、出力できる行数はLIMITにあたえられる整数値の上限である9,223,372,036,854,775,807までに限られますし、カラムが多いテーブルではワーカーに割り当てられたメモリを超過する恐れもあります。
私の現場ではたまたまLIMITを使った方法がマッチしましたが、対処療法的なアプローチであることは否めないでしょう。
「Resources exceeded during query execution」への対処方法
少し余談になりますが、私が現場でLIMITをかける手法を採用したあとも実は少し試行錯誤がありました。
というのも、現場ではお客様に納品するデータを作成していたため、データを整形するためのソート処理も入っており、1つのクエリでソートとLIMITを同時に実行しようとすると「Resources exceeded during query execution: The query could not be executed in the allotted memory. Peak usage: ...」というようなエラーが発生していたのです。
これはワーカーに割り当てられたリソースが不足していたために発生したエラーだったのですが、このエラーは以下のようにソートを含む処理を一時テーブルに書き出させるなどして別のワーカーに処理を分担させることで解決できました。
CREATE OR REPLACE TEMP TABLE output AS (
SELECT repository_url,
repository_created_at,
repository_description,
repository_homepage,
repository_size,
repository_name,
repository_owner,
repository_pushed_at,
repository_language,
repository_organization,
repository_watchers,
repository_master_branch
FROM `bigquery-public-data.samples.github_timeline`
ORDER BY 1,2,8
);
EXPORT DATA OPTIONS (
uri = 'gs://bq-export-data-test/limit/github_timeline*.csv',
overwrite = true,
format = 'CSV',
field_delimiter = ',',
compression = 'GZIP',
header = true
) AS
SELECT * FROM output LIMIT 6219749;
クエリ結果をPythonで圧縮
LIMITによる方法があまりスッキリしなかったので、他の対処方法として次の考えたのが「Python上でデータを圧縮してGCSに転送する」という方法だったのですが、はっきり言ってこの方法は絶対にやってはいけません。
理由は以下の結果を見ればご理解いただけると思います。
import gzip
import io
import time
from google.cloud import bigquery
from google.cloud import storage
project_id = 'marz-bq-poc'
location = 'US'
sql = """
SELECT repository_url,
repository_created_at,
repository_description,
repository_homepage,
repository_size,
repository_name,
repository_owner,
repository_pushed_at,
repository_language,
repository_organization,
repository_watchers,
repository_master_branch
FROM `bigquery-public-data.samples.github_timeline`
"""
bucket_name = 'bq-export-data-test'
blob_name = 'compress/github_timeline.csv.gz'
start_time = time.time()
bq_client = bigquery.Client(project=project_id, location=location)
df = bq_client.query(sql).to_dataframe()
gzip_buffer = io.BytesIO()
with gzip.GzipFile(fileobj=gzip_buffer, mode='w') as gzip_file:
df.to_csv(gzip_file, index=False, sep=',')
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_filename(gzip_buffer, content_type='application/gzip')
end_time = time.time()
print(f'処理時間: {end_time - start_time}[秒]')
「処理時間: 2447.00887966156[秒]」=約40分 👀!!?
考察③
4分程度で終わっていた処理が40分にも膨れ上がった結果を踏まえて調べてみたところ、どうやらBigQueryクライアントの「bq_client.query(sql).to_dataframe()」や「bq_client.query(sql).result()」はBigQueryからクエリの結果を1行ずつ取得するイテレータを返すため、クエリ結果を圧縮する中で膨大な回数の通信が発生していたようです。
特に今回は東京リージョンに構築されたVMインスタンスからUSリージョンのクエリ結果を取得していたので、レイテンシーの影響が強く受けていたはずです。
検証は個人の環境で実行したので特に問題は起きませんでしたが、もしこんなことを本番環境でやっていたらと考えると恐ろしいですね...💦
出力ファイルをPythonで集積
クエリ結果をPythonで圧縮する方法は大失敗でしたが、SQLとPythonを組み合わせる方法は決して悪くありません。
実際、今回の検証で一番パフォーマンスがよく確実なアプローチは以下に紹介するようなSQLで出力させた複数のファイルをPythonで集積するという方法でした。
import gzip
import io
import time
from google.cloud import bigquery
from google.cloud import storage
project_id = 'marz-bq-poc'
location = 'US'
sql = """
EXPORT DATA OPTIONS (
uri = 'gs://bq-export-data-test/merge/github_timeline*.csv.gz',
overwrite = true,
format = 'CSV',
field_delimiter = ',',
compression = 'GZIP',
header = true
) AS
SELECT repository_url,
repository_created_at,
repository_description,
repository_homepage,
repository_size,
repository_name,
repository_owner,
repository_pushed_at,
repository_language,
repository_organization,
repository_watchers,
repository_master_branch
FROM `bigquery-public-data.samples.github_timeline`
"""
bucket_name = 'bq-export-data-test'
blob_prefix = 'merge/github_timeline'
output_blob_name = 'merge/github_timeline.csv.gz'
temp_output_filename = '/tmp/merge_output.csv.gz'
start_time = time.time()
bq_client = bigquery.Client(project=project_id, location=location)
bq_client.query(sql).result()
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob_list = bucket.list_blobs(prefix=blob_prefix)
merged_gzip_buffer = io.BytesIO()
with gzip.GzipFile(fileobj=merged_gzip_buffer, mode='w') as merged_gzip:
for blob in blob_list:
blob_buffer = io.BytesIO()
blob.download_to_file(blob_buffer)
blob_buffer.seek(0)
with gzip.GzipFile(fileobj=blob_buffer, mode='rb') as single_gzip_file:
merged_gzip.write(single_gzip_file.read())
output_blob = bucket.blob(output_blob_name)
merged_gzip_buffer.seek(0)
with open(temp_output_filename, 'wb') as temp_output_file:
temp_output_file.write(merged_gzip_buffer.read())
output_blob.upload_from_filename(temp_output_filename)
end_time = time.time()
print(f'処理時間: {end_time - start_time}[秒]')
処理時間: 206.2866404056549[秒] = 約3.4分
考察④
SQLで出力させたファイルをPythonで圧縮する方法は、LIMITでファイルを出力させた場合の4分より短い時間で処理が完結しており、BigQueryの並列分散処理の恩恵を受けていることがうかがえます。
ただし、この方法は画像のように複数の一時ファイルがバケットに残ってしまいますし、SQLとPythonにロジックが分散することでワークロードがやや保守しにくくなります。
個人的な意見としては、まずLIMITで確実にファイルを出力できる場合はLIMITを採用し、LIMITを使うとリソースが足りない恐れのある場合はSQLとPythonのハイブリッド方式を採用するという方針がいいのではないかと思います。
EXTRACT TABLE
ある程度結論が見えてきましたが、念のため他にもっといい方法がないか探ってみましょう。
以下はBigQueryクライアントのextract_tableメソッドを使ってSQLのEXPORT DATAに相当する処理を実行するプログラムですが、結果はサイズオーバーでエラーになるようです。
import time
from google.cloud import bigquery
project_id = 'marz-bq-poc'
location = 'US'
source_table = 'bigquery-public-data.samples.github_timeline'
bucket_name = 'bq-export-data-test'
blob_name = 'extract/github_timeline.csv.gz'
output_uri = f'gs://{bucket_name}/{blob_name}'
start_time = time.time()
bq_client = bigquery.Client(project=project_id, location=location)
job_config = bigquery.ExtractJobConfig()
job_config.destination_format = 'CSV'
job_config.compression = 'GZIP'
job_config.field_delimiter = ','
job_config.print_header = True
extract_job = bq_client.extract_table(
source_table, output_uri, job_config=job_config
)
extract_job.result()
end_time = time.time()
print(f'処理時間: {end_time - start_time}[秒]')
エラーメッセージを読んでみるとどうやら1つのファイルとして出力するにはデータが大きすぎるといているみたいですね。
そこで、output_uriにワイルドカードの「*」を入れたところ、次のように問題なくテーブルのエクスポートが完了しました。
また、エクスポートするデータを1GB以内に制限するとワイルドカードを付けなくてもテーブルデータを1つのファイルに出力できることも確認できました。
考察⑤
クライアントライブラリのextract_tableコマンドを使えば1GB以内のデータは単一のファイルに出力できそうなことは分かりました。
しかし、1GBを超えるデータのエクスポートにはワイルドカードが必須であり、これでは結局「1GB以上のデータは複数のファイルに分割される」というルールを強制されただけなので、ファイルを1つにまとめるというユースケースにはふさわしくなさそうです。
bqコマンド
最後にbqコマンドを使ったアプローチも検証してみましょう。
BQでは「BigQuery Studioはパラメータ化クエリが使えないけど、クライアントライブラリやbqコマンドでは使える」みたいなレアなケースもあるので、最後のあがきとしてダメもと次のコマンドを実行してみます。
bq extract --destination_format=CSV \
--compression=GZIP \
--field_delimiter=',' \
--print_header \
your-project-id:extract_test.github_timeline \
gs://bq-export-data-test/bq/github_timeline.csv.gz
extract_tableを使った場合と同様にサイズが大きすぎると怒られてしまいました...。
データの出力先にワイルドカードを入れてリトライしてみましょう。
bq extract --destination_format=CSV \
--compression=GZIP \
--field_delimiter=',' \
--print_header \
your-project-id:extract_test.github_timeline \
gs://bq-export-data-test/bq/github_timeline.csv*.gz
するとやはりデータのファイル出力はできるのですが、ファイルが複数に分割されてしまいます。
考察⑥
このように1GBを超えるデータを出力するにはワイルドカードが必須のようなので、extract_tableメソッドのときと同様にbqコマンドを使ったパターンも1GB以上のデータを1つのファイルにまとめて出力する用途では使えないようです。
まとめ
ここまで色々な方法でBigQueryのデータを1ファイルにまとめて出力する方法を検証してきました。
結果として、私が経験した現場では「まずLIMITを検討し、リソースが足りなければPythonで分割されたファイルをまとめる」という方針で対応できました。
ただし、各種の検証を振り返ってみるとどの方法にも一長一短があり、どんな場合にも当てはまるベストアンサーを見つけるというのは難しくもあると感じました。
もし今後みなさんがBigQueryから大きなデータを出力することになった時は、この記事の検証結果がみなさんのアプローチ選択の参考になることを祈っています。
最後までお読みいただきありました!