サマリー
- BIツールの発行するチューニング無しSQLが予想外の課金を発生される恐れがあるため、参照先をBigQueryからDataprocに変更した。
- そのために、BigQueryのデータをDataprocにコピーする際に、BigQueryコネクタを使うと楽&早かったという話です。

モチベーション
- BIツールもBigQueryに繋げておけばおk。という風潮に疑問が生じた。
- せっかく作った昔の処理フローの供養すべく、記載しておくことにした。
背景
課題と対策
- BIツールが発行するSQLがチューニングされてない場合があり、予想外の課金が発生しそう。
- 実際に、GoogleDataPortalからBigQueryの操作で、1回の参照で●万近くの課金が発生した。
- そこで、BIツールの接続先を従量課金のBigQueryから時間単価のDataprocに変更した。
- BigQueryの演算パワーと柔軟なSQL記述は魅力的なので、演算はBigQueryで完了させる。
- 演算終了後、BigQueryのデータをDataprocに連携して、Dataproc内のPrestoに対してBIツールを接続する。
- ただし、GoogleDataPortalはPrestoに接続できないので、Tableau/Redash等で接続してもらう。
実際の処理について
- Google Cloud Composer(Airflow)でジョブフローを作成。
- 検証したバージョンは、「composer-1.7.3-airflow-1.10.2」です。
- 当初は、BigQuery → GCS(Avroフォーマット) → Dataproc(Parquetフォーマットに変換)で対応していた。
- 変更後は、BigQuery → Dataproc(spark-bigquery-connector)で対応した方が、楽&早かったので変更。
- 処理taskが6→1になった。
- 計算時間が1/2になった。
前提
- IAMの設定
-
Google Cloud Composer用のサービスアカウントに各種roleを与える必要があります。
- 「BigQuery データ編集者」
- 「Composer ワーカー」
- 「Dataproc編集者」
- 「ストレージ オブジェクト管理者」
-
Google Cloud Composer用のサービスアカウントに各種roleを与える必要があります。
当初の処理フロー
- 1. エクスポート対象のテーブルがpartitionを持っている場合で、一部パーティションだけをDataprocにコピーする場合は、エクスポート用の一時テーブルに書き出す。
- CALCは計算日を意味しており、計算日をpartition keyとしている。
bql_template = f'SELECT * FROM `{$PROJECT_ID}.{$DATASET_ID}.{$TARGET_TABLE}` where DATE(_PARTITIONTIME)="{$CALC_YYYY-MM-DD}";'
extract_task = BigQueryOperator(
task_id=f'bq_extract-{target_table}',
sql=bql_template,
destination_dataset_table=f'{$DESTINATION_PROJECT}.{$DESTINATION_DATASET}.{$TARGET_TABLE}',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
)
- 2. 念の為、GCSのエクスポート先のファイルを削除
- 例:airflow上の処理
clean_gcs_object_task = GoogleCloudStorageDeleteOperator(
task_id=f'clean_gcs_object-{$TARGET_TABLE}',
bucket_name=f'{$BUCKET_NAME"}',
prefix=f'{$TARGET_TABLE}/avro',
)
- 3. BQの一時テーブルをGCSにエクスポート
- 例:airflow上の処理
export_task = BigQueryToCloudStorageOperator(
task_id=f'bq_to_gcs-{$TARGET_TABLE}',
source_project_dataset_table=f'{$DESTINATION_PROJECT}.{$DESTINATION_DATASET}.{$TARGET_TABLE}',
destination_cloud_storage_uris=f'gs://{$BUCKET_NAME}/{$TARGET_TABLE}/avro/dt={$CALC_YYYYMMDD}/shard-*.avro',
export_format='AVRO',
)
- 4. 念の為、Avro読み込み用の一時テーブル削除
- 例:airflow上の処理
drop_temp_table_task = DataProcHiveOperator(
task_id=f'drop_temp_table-{$TARGET_TABLE}',
cluster_name=f'{$CLUSTER_NAME}',
project_id=f'{$PROJECT_ID}',
region=f'{$CLUSTER_REGION}',
query=f'DROP TABLE {$TARGET_TABLE}_temp;',
)
- 5. Dataprocの外部テーブルを作成して、Avroファイルを指定する。
- 例:airflow上の処理、Avroフォーマット用テーブル定義
- Avroフォーマットの一時テーブルcreate文が、{GCS上のcomposer root}/sql/create_xxx.sqlにある想定。
- 例:airflow上の処理、Avroフォーマット用テーブル定義
# if the sql to create table exists in "composer's root path"/sql/ddl/xxx.sql
sql_file_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "sql", "create_{}.sql".format('${TARGET_TABLE}'))
sql_template = open(sql_file_path).read()
create_temp_table_task = DataProcHiveOperator(
task_id=f'create_temp_table-{$TARGET_TABLE}',
cluster_name=f'{$CLUSTER_NAME}',
project_id=f'{$PROJECT_ID}',
region=f'{$CLUSTER_REGION}',
query=sql_template,
)
-- sql to create Avro table
CREATE EXTERNAL TABLE {$TARGET_TABLE}_temp
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS AVRO
LOCATION 'gs://{$BUCKET_NAME}/{$TARGET_TABLE}/avro/dt={$CALC_YYYYMMDD}'
TBLPROPERTIES (
'avro.schema.literal'='{
"name": "{$TARGET_TABLE}_temp",
"type": "record",
"fields": [
{"name":"id", "type":["long","null"]},
{"name":"cv_time", "type":{"type":"long","logicalType":"timestamp-micros"}},
]
}'
)
;
- 6. Dataprocの外部テーブルをAvroフォーマットからParquetフォーマットに変換
- 例:airflow上の処理、Parquetフォーマット用テーブル定義、convert用のsql
- convert用sql文が、{GCS上のcomposer root}/sql/convert_xxx.sqlにある想定。
- 例:airflow上の処理、Parquetフォーマット用テーブル定義、convert用のsql
# if the sql to convert table exists in "composer's root path"/sql/ddl/xxx.sql
sql_file_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "sql", "convert_{}.sql".format('${TARGET_TABLE}'))
sql_template = open(sql_file_path).read()
convert_parquet_task = DataProcHiveOperator(
task_id=f'convert_parquet-{$TARGET_TABLE}',
cluster_name=f'{$CLUSTER_NAME}',
project_id=f'{$PROJECT_ID}',
region=f'{$CLUSTER_REGION}',
query=sql_template,
)
-- sql to create Parquet table
CREATE EXTERNAL TABLE {$TARGET_TABLE}
(
id BIGINT
, cv_time TIMESTAMP
)
PARTITIONED BY (calc_date string)
STORED AS PARQUET
location 'gs://{$BUCKET_NAME}/{$TARGET_TABLE}/parquet/'
TBLPROPERTIES ('parquet.compression'='SNAPPY')
;
-- sql to convert AVRO format to Parquet format
INSERT OVERWRITE TABLE {$TARGET_TABLE} PARTITION (calc_date='$CALC_YYYY-MM-DD')
SELECT
*
FROM {$TARGET_TABLE}_temp
;
面倒な点
- BigQueryからGCSへのエクスポート時
- BQのpartition単位でエクスポートができない。
- partition単位でエクスポートした場合には、一度エクスポート対象のpartitionを、一時テーブルにコピーしてからGCSにエクスポートが必要。
- Dataproc上ではParquetフォーマットの方が圧縮率・演算効率が良い。しかし、BQからのエクスポートはCSV/json/Avroフォーマットしか選択できないため、DataprocなどでAvro→Parquetの変換が必要。
- BQのpartition単位でエクスポートができない。
- DataprocでのAvro→Parquet変換時
- Avroデータの読み込みのためにexternal tableの定義が必要である。ただし、Avroフォーマットの型がわかりにくいのでテーブル定義の作成に苦労する。
- 例:BQのtimestamp型に関してAvroフォーマットに変換する際(直感的には分かりにくい、他にもarrayやstructなども分かりにくい。)。
{"name":"cv_time", "type":{"type":"long","logicalType":"timestamp-micros"}},
BigQueryコネクタに変更後
- BigQueryコネクタの公式ドキュメント通りにやると試せます。
変更後の処理フロー
- 事前準備. pyspark処理用スクリプトの準備
- 例:airflowの処理で参照するので、cloud composer用のバケットにdeployしておく
# !/usr/bin/python
"""BigQuery I/O PySpark example."""
from pyspark.sql import SparkSession
import sys
args = sys.argv
params = {
'project_id': args[1],
'dataset_id': args[2],
'target_table': args[3],
'partition_key': args[4]
}
spark = SparkSession \
.builder \
.master('yarn') \
.appName('spark-bigquery-demo') \
.enableHiveSupport() \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = spark.sparkContext._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
spark.conf.set('temporaryGcsBucket', bucket)
# Load data from BigQuery.
temp_table_name = 'tmp'
tmp = spark.read.format('bigquery').option('table', f'{params["project_id"]}.{params["dataset_id"]}.{params["target_table"]}')
if params['partition_key'] != '':
tmp.option('filter', f"_PARTITIONDATE = '{params['partition_key']}'")
tmp.load().createOrReplaceTempView(temp_table_name)
# Perform sql.
query = f'INSERT OVERWRITE TABLE {params["target_table"]}'
if params['partition_key'] != '':
query += f" PARTITION (calc_date='{params['partition_key']}')"
query += f' SELECT * FROM {params['target_table']}'
spark.sql(query)
- 1. BigQuery → Dataproc(spark-bigquery-connector)でデータコピー
- 例:airflow上の処理(Parquetフォーマットのテーブルは事前に別途必要。)
- pyspark用処理スクリプトが、{GCS上のcomposer root}/spark/import_XXX.pyにある想定。
- 例:airflow上の処理(Parquetフォーマットのテーブルは事前に別途必要。)
# import Dataproc from BigQuery
import_py = os.path.join(os.path.abspath(os.path.dirname(__file__)), "spark", "import_{}.py".format('${TARGET_TABLE}'))
import_task = DataProcPySparkOperator(
task_id=f'import-{$TARGET_TABLE}',
cluster_name=f'{$CLUSTER_NAME}',
project_id=f'{$PROJECT_ID}',
region=f'{$CLUSTER_REGION}',
main=import_py,
dataproc_pyspark_jars=['gs://spark-lib/bigquery/spark-bigquery-latest.jar'],
arguments=[
{$PROJECT_ID},
{$DATESET_ID},
{$TARGET_TABLE},
{$partition_key}
],
)
以上です、南無〜。