LoginSignup
5
5

More than 3 years have passed since last update.

[Case]BIツールが発行するSQLが怖いので、BigQueryのデータをDataprocに楽にコピーして使う。

Last updated at Posted at 2020-02-24

サマリー

  • BIツールの発行するチューニング無しSQLが予想外の課金を発生される恐れがあるため、参照先をBigQueryからDataprocに変更した。
  • そのために、BigQueryのデータをDataprocにコピーする際に、BigQueryコネクタを使うと楽&早かったという話です。

素材用_-_Google_Slides.png

モチベーション

  • BIツールもBigQueryに繋げておけばおk。という風潮に疑問が生じた。
  • せっかく作った昔の処理フローの供養すべく、記載しておくことにした。

背景

課題と対策

  • BIツールが発行するSQLがチューニングされてない場合があり、予想外の課金が発生しそう。
    • 実際に、GoogleDataPortalからBigQueryの操作で、1回の参照で●万近くの課金が発生した。
  • そこで、BIツールの接続先を従量課金のBigQueryから時間単価のDataprocに変更した。
    • BigQueryの演算パワーと柔軟なSQL記述は魅力的なので、演算はBigQueryで完了させる。
    • 演算終了後、BigQueryのデータをDataprocに連携して、Dataproc内のPrestoに対してBIツールを接続する。
    • ただし、GoogleDataPortalはPrestoに接続できないので、Tableau/Redash等で接続してもらう。

実際の処理について

  • Google Cloud Composer(Airflow)でジョブフローを作成。
    • 検証したバージョンは、「composer-1.7.3-airflow-1.10.2」です。
  • 当初は、BigQuery → GCS(Avroフォーマット) → Dataproc(Parquetフォーマットに変換)で対応していた。
  • 変更後は、BigQuery → Dataproc(spark-bigquery-connector)で対応した方が、楽&早かったので変更。
    • 処理taskが6→1になった。
    • 計算時間が1/2になった。

前提

  • IAMの設定
    • Google Cloud Composer用のサービスアカウントに各種roleを与える必要があります。
      • 「BigQuery データ編集者」
      • 「Composer ワーカー」
      • 「Dataproc編集者」
      • 「ストレージ オブジェクト管理者」

当初の処理フロー

  • 1. エクスポート対象のテーブルがpartitionを持っている場合で、一部パーティションだけをDataprocにコピーする場合は、エクスポート用の一時テーブルに書き出す。
    • CALCは計算日を意味しており、計算日をpartition keyとしている。
bql_template = f'SELECT * FROM `{$PROJECT_ID}.{$DATASET_ID}.{$TARGET_TABLE}` where DATE(_PARTITIONTIME)="{$CALC_YYYY-MM-DD}";'
extract_task = BigQueryOperator(
    task_id=f'bq_extract-{target_table}',
    sql=bql_template,
    destination_dataset_table=f'{$DESTINATION_PROJECT}.{$DESTINATION_DATASET}.{$TARGET_TABLE}',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    create_disposition='CREATE_IF_NEEDED',
)
  • 2. 念の為、GCSのエクスポート先のファイルを削除
    • 例:airflow上の処理
clean_gcs_object_task = GoogleCloudStorageDeleteOperator(
    task_id=f'clean_gcs_object-{$TARGET_TABLE}',
    bucket_name=f'{$BUCKET_NAME"}',
    prefix=f'{$TARGET_TABLE}/avro',
)
  • 3. BQの一時テーブルをGCSにエクスポート
    • 例:airflow上の処理
export_task = BigQueryToCloudStorageOperator(
    task_id=f'bq_to_gcs-{$TARGET_TABLE}',
    source_project_dataset_table=f'{$DESTINATION_PROJECT}.{$DESTINATION_DATASET}.{$TARGET_TABLE}',
    destination_cloud_storage_uris=f'gs://{$BUCKET_NAME}/{$TARGET_TABLE}/avro/dt={$CALC_YYYYMMDD}/shard-*.avro',
    export_format='AVRO',
)
  • 4. 念の為、Avro読み込み用の一時テーブル削除
    • 例:airflow上の処理
drop_temp_table_task = DataProcHiveOperator(
    task_id=f'drop_temp_table-{$TARGET_TABLE}',
    cluster_name=f'{$CLUSTER_NAME}',
    project_id=f'{$PROJECT_ID}',
    region=f'{$CLUSTER_REGION}',
    query=f'DROP TABLE {$TARGET_TABLE}_temp;',
)
  • 5. Dataprocの外部テーブルを作成して、Avroファイルを指定する。
    • 例:airflow上の処理、Avroフォーマット用テーブル定義
      • Avroフォーマットの一時テーブルcreate文が、{GCS上のcomposer root}/sql/create_xxx.sqlにある想定。
# if the sql to create table exists in "composer's root path"/sql/ddl/xxx.sql
sql_file_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "sql", "create_{}.sql".format('${TARGET_TABLE}'))
sql_template = open(sql_file_path).read()
create_temp_table_task = DataProcHiveOperator(
    task_id=f'create_temp_table-{$TARGET_TABLE}',
    cluster_name=f'{$CLUSTER_NAME}',
    project_id=f'{$PROJECT_ID}',
    region=f'{$CLUSTER_REGION}',
    query=sql_template,
)
-- sql to create Avro table
CREATE EXTERNAL TABLE {$TARGET_TABLE}_temp
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS AVRO
LOCATION 'gs://{$BUCKET_NAME}/{$TARGET_TABLE}/avro/dt={$CALC_YYYYMMDD}'
TBLPROPERTIES (
  'avro.schema.literal'='{
    "name": "{$TARGET_TABLE}_temp",
    "type": "record",
    "fields": [
      {"name":"id", "type":["long","null"]},
      {"name":"cv_time", "type":{"type":"long","logicalType":"timestamp-micros"}},
    ]
  }'
)
;
  • 6. Dataprocの外部テーブルをAvroフォーマットからParquetフォーマットに変換
    • 例:airflow上の処理、Parquetフォーマット用テーブル定義、convert用のsql
      • convert用sql文が、{GCS上のcomposer root}/sql/convert_xxx.sqlにある想定。
# if the sql to convert table exists in "composer's root path"/sql/ddl/xxx.sql
sql_file_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "sql", "convert_{}.sql".format('${TARGET_TABLE}'))
sql_template = open(sql_file_path).read()
convert_parquet_task = DataProcHiveOperator(
    task_id=f'convert_parquet-{$TARGET_TABLE}',
    cluster_name=f'{$CLUSTER_NAME}',
    project_id=f'{$PROJECT_ID}',
    region=f'{$CLUSTER_REGION}',
    query=sql_template,
)
-- sql to create Parquet table
CREATE EXTERNAL TABLE {$TARGET_TABLE}
(
  id        BIGINT
  , cv_time TIMESTAMP
)
PARTITIONED BY (calc_date string)
STORED AS PARQUET
location 'gs://{$BUCKET_NAME}/{$TARGET_TABLE}/parquet/'
TBLPROPERTIES ('parquet.compression'='SNAPPY')
;
-- sql to convert AVRO format to Parquet format
INSERT OVERWRITE TABLE {$TARGET_TABLE} PARTITION (calc_date='$CALC_YYYY-MM-DD')
SELECT
  *
FROM {$TARGET_TABLE}_temp
;
面倒な点
  • BigQueryからGCSへのエクスポート時
    • BQのpartition単位でエクスポートができない。
      • partition単位でエクスポートした場合には、一度エクスポート対象のpartitionを、一時テーブルにコピーしてからGCSにエクスポートが必要。
    • Dataproc上ではParquetフォーマットの方が圧縮率・演算効率が良い。しかし、BQからのエクスポートはCSV/json/Avroフォーマットしか選択できないため、DataprocなどでAvro→Parquetの変換が必要。
  • DataprocでのAvro→Parquet変換時
    • Avroデータの読み込みのためにexternal tableの定義が必要である。ただし、Avroフォーマットの型がわかりにくいのでテーブル定義の作成に苦労する。
    • 例:BQのtimestamp型に関してAvroフォーマットに変換する際(直感的には分かりにくい、他にもarrayやstructなども分かりにくい。)。
{"name":"cv_time", "type":{"type":"long","logicalType":"timestamp-micros"}},

BigQueryコネクタに変更後

変更後の処理フロー
  • 事前準備. pyspark処理用スクリプトの準備
    • 例:airflowの処理で参照するので、cloud composer用のバケットにdeployしておく
#!/usr/bin/python
"""BigQuery I/O PySpark example."""
from pyspark.sql import SparkSession
import sys

args = sys.argv
params = {
    'project_id': args[1],
    'dataset_id': args[2],
    'target_table': args[3],
    'partition_key': args[4]
}

spark = SparkSession \
    .builder \
    .master('yarn') \
    .appName('spark-bigquery-demo') \
    .enableHiveSupport() \
    .getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = spark.sparkContext._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
spark.conf.set('temporaryGcsBucket', bucket)

# Load data from BigQuery.
temp_table_name = 'tmp'
tmp = spark.read.format('bigquery').option('table', f'{params["project_id"]}.{params["dataset_id"]}.{params["target_table"]}')

if params['partition_key'] != '':
    tmp.option('filter', f"_PARTITIONDATE = '{params['partition_key']}'")

tmp.load().createOrReplaceTempView(temp_table_name)

# Perform sql.
query = f'INSERT OVERWRITE TABLE {params["target_table"]}'
if params['partition_key'] != '':
    query += f" PARTITION (calc_date='{params['partition_key']}')"
query += f' SELECT * FROM {params['target_table']}'
spark.sql(query)
  • 1. BigQuery → Dataproc(spark-bigquery-connector)でデータコピー
    • 例:airflow上の処理(Parquetフォーマットのテーブルは事前に別途必要。)
      • pyspark用処理スクリプトが、{GCS上のcomposer root}/spark/import_XXX.pyにある想定。
# import Dataproc from BigQuery
import_py = os.path.join(os.path.abspath(os.path.dirname(__file__)), "spark", "import_{}.py".format('${TARGET_TABLE}'))
import_task = DataProcPySparkOperator(
    task_id=f'import-{$TARGET_TABLE}',
    cluster_name=f'{$CLUSTER_NAME}',
    project_id=f'{$PROJECT_ID}',
    region=f'{$CLUSTER_REGION}',
    main=import_py,
    dataproc_pyspark_jars=['gs://spark-lib/bigquery/spark-bigquery-latest.jar'],
    arguments=[
        {$PROJECT_ID},
        {$DATESET_ID},
        {$TARGET_TABLE},
        {$partition_key}
    ],
)

以上です、南無〜。

5
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
5