Edited at
SENSYDay 23

BigQueryでテーブルを適当に分割する

BigQuery便利ですよね。

僕は勝手に、BigQueryはSQLライクなDBの皮をかぶった分散コンピューティングツールと思っています。

今回はBigQueryで大きすぎるテーブルを超適当に分割してみます。


前提

今回はパーティションテーブル等の分割方法が使えないシーンを想定しています。

(日付時刻カラムが存在しない、偏りが激しいなど)

パーティションやシャーディングなどが可能な場合はそちらを使えばいいと思います。


大きいテーブルに対するクエリは料金がかかる

BigQueryはストレージ容量の他に、クエリするデータの容量に対して課金がされます。

そのコストはとても安くて早いですが、大きなテーブルの場合は一回のクエリコストが無視できなくなるケースもあるでしょう。

この記事はそういうけち臭い人のための記事です(すみません)。

(記事作成時点で$5 per TB とだいぶ安いですが。。。)


一つのテーブルから複数分割する場合は何回かクエリが必要

一つのテーブルを分割したいくつかのテーブルに分ける場合、何回かクエリが必要です。

そして、その容量は一つ目のテーブルの容量に応じて大きくなって行きます。


馬鹿正直に分割する人

Split a table into multiple tables in BigQuery SQL

RAND関数で区切ったり、hashで区切ったり。

# 乱数つけ

SELECT
..,
RAND() as rnd
FROM ...

# 分割位置でそれぞれのテーブルにクエリ(全体クエリ)
SELECT
...,
FROM ...
WHERE rnd >= 0.3 AND rnd < 0.4

こうすると、元テーブル容量 * (分割数 + 1) のクエリコストがかかりそうです。

工夫すればもう少し減らせそうですが、オーダーをかえるクエリが思いつきませんでした。。

なので面倒になって他の方法を考えていました。


無料オペレーションが結構便利

BigQueryにはテーブル操作系の操作・オペレーションは無料で行うことができます。

無料のオペレーション


  • データの読み込み

  • データのコピー

  • データのエクスポート

  • データセットの削除

  • テーブル、ビュー、パーティションの削除

  • メタデータ オペレーション

  • 疑似列の読み取り

  • メタテーブルの読み取り

ただし、割り当てと制限で決まっている制限回数は存在しています。


書き出しは分散できる

データのエクスポートは、テーブルの内容をGCSに色々な形式で書き出す機能ですが、大きなテーブルは自動で分散書き出しする方法がついています。

その機能で、複数のファイルへのエクスポートが可能です。

GCSへの書き出し先のファイル名はdestinationUriに指定します。

この話が、この記事の中心ですが、この名前にワイルドカードを使うことで、複数のファイルに書き出すことが可能です。

そして、ワイルドカードは一つしか使えませんが、複数ファイルパスを指定可能なので、ワイルドカードを使用した複数のファイルパスを指定することで、適当なサイズへ自動で分割保存してくれます。


  • 単一のファイル: gs:/mybucket/file.json

  • 複数のファイル


    • ワイルドカード一つ: gs:/mybucket/file-*.json

    • ワイルドカード一つのURIを複数: ['gs:/mybucket/file-0-*.json', 'gs:/mybucket/file-1-*.json']




注意点


  • ワイルドカードには番号-分割数というような数値とハイフンの組み合わせが勝手に当てはめられます。


    • 分割数は指定できません。



  • 1ファイル上限1GBになるように適当に分割されるため、テーブルサイズが小さい場合は均等に分割されずに固まったりします。


読み込みも複数ファイル読み込みに対応している

データの読み込みも分散読み込みが可能です。

GCS上のファイル指定でワイルドカードを一つまでは使用することができ、その対象ファイルをロードすることが可能です。


実現方法

自動分散書き込みと、分散数分の読み込みを行うことで、それぞれのテーブルに一回ずつクエリしたとしても元テーブル全体を一回クエリするコストと同じだけのコストで済ませることが可能です。

なぜなら、


  • 書き込み・読み込みは無料オペレーション

  • 元テーブルは書き込み(エクスポート)のみでクエリしない

  • 分割後のテーブルのサイズの合計は元テーブルと同様なので、それぞれクエリしても元テーブル分にしかならない

  • GCSの書き込み・読み込みコストもタダ(リージョンが一致しないと動かない・保存はコストがかかるがすぐ消せば大きくない)

だからです。

実現は、以下のステップを行うだけです。


  1. データのエクスポート


    • ワイルドカードURIを複数指定して、自動で分割してエクスポートします。
      c

    • 分割数分のテーブルに対して、ワイルドカードURIをそれぞれ一つずつ指定して読み込む。



  2. 行数合計の確認


    • 分割テーブルの合計行数が元テーブルの行数とあっているか確認



  3. GCSにエクスポートしたファイルの削除


    • 余分なコストを減らすために中間ファイルは削除する。




実装例

python, google-cloud-pythonライブラリでの実装例を記載します。


1. データのエクスポート

from google.cloud import bigquery

# 分割数4の場合です
split_num = 4

bq_client = bigquery.Client()
dataset_obj = bq_client.get_dataset(bq_client.dataset('my_dataset'))
table_id = 'my_table'
table_ref = dataset_obj.table(table_id)
gs_uris = ['gs://my_bucket/bq_{num}_*.avro'.format(num=i)
for i in range(split_num)]

ext_job_config = bigquery.ExtractJobConfig()
ext_job_config.destination_format = 'AVRO' # 形式は揃っていてschemaが共有できればなんでもいいです。
ext_job_config.compression = 'NONE'
extract_job = bq_client.extract_table(
source=table_ref, destination_uris=gs_uris, job_config=ext_job_config)
# wait for job (export_data_to_uris)
extract_job.result()

# 最後にschemaを保持しておく
schema = bq_client.get_table(table_ref).schema


2. データの読み込み

load_jobs = []

target_tables = []
for i, uri in enumerate(gs_uris):
new_table_id = 'new_table_{}_{}'.format(i, split_num)
load_job_config = bigquery.LoadJobConfig()
load_job_config.schema = schema
load_job_config.source_format = bigquery.SourceFormat.AVRO
load_job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE # とりあえず上書きにします

load_job = bq_client.load_table_from_uri(
source_uris=uri, destination=dataset_obj.table(new_table_id), job_config=load_job_config)
load_jobs.append(load_job)

target_tables.append(new_table_id)

# wait jobs.
for job, table in zip(load_jobs, target_tables):
# Waits for table load to complete.
job.result()


3. 行数合計の確認


source_rows = bq_client.get_table(dataset_obj.table(table_id)).num_rows
rows_sum = sum([bq_client.get_table(dataset_obj.table(i)).num_rows for i in target_tables])
assert rows_sum == source_rows, 'split tables not all rows included!'


4. GCSにエクスポートしたファイルの削除

for b in storage.Client().get_bucket('my_bucket').list_blobs(prefix='bq_'):

b.delete()


注意事項


割り当て制限

無料オペレーションの項でも書きましたが、無料オペレーションは無料の代わりに厳しい利用制限があります!

これをオーバーした場合は動作しなくなるので、この方法は使えません。


分割数や容量は厳密に指定できない

分割は自動で行われるので範囲指定等はできません。

また、容量の偏りもあり得ますし、個数も大まかに均等にはなりますが、あまり厳密な分割には向いていません。