3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

BigQueryへGCS上のCSVファイルをEmbulkで取り込む際のポイント

Last updated at Posted at 2019-11-13

前提

  • outputには、embulk/embulk-output-bigqueryを利用します。
    • ここに書いてあることはだいたいREADMEに書いています。
  • 取り込み先はパーティショニングテーブル(timePartitioning:DAY)
  • データ取り込みは日時で行い、日付パーティショニングするものとします。
  • embulk 0.9.17でテストした。

サンプル(.yml.liquid)

  • input部分については、本記事では触れません
in:
  type: gcs
  project: foobar_project
  bucket: foobar_bucket
  path_prefix: /path/to/{{ env.EXECUTE_DATE }}/{{ env.LOGNAME }}.csv.gz
  auth_method: compute_engine
  application_name: foobar_app
  decoders:
    - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    skip_header_lines: 1
    stop_on_invalid_record: true

out:
  type: bigquery
  project: foobar_project
  mode: delete_in_advance
  auth_method: compute_engine
  location: asia-northeast1
  auto_create_table: true
  time_partitioning:
    type: 'DAY'
    require_partition_filter: true
  dataset: foobar_dataset
  table: foobar_table${{ env.EXECUTE_DATE }}
  compression: GZIP
  source_format: CSV
  schema_update_options: ['ALLOW_FIELD_ADDITION']
  stop_on_invalid_record: true
  columns:
    - {name: c0, type: string}
    - {name: c1, type: long}
    - {name: c2, type: string}

Output部分の解説

  • mode
    • (追記ではなく)冪等を実現するためには、modeは replace replace_backup delete_in_advance などがあります。テンポラリテーブルを利用している、 replace は早いのですが、勝手にカラム追加してほしいので、 delete_in_advance を使っています(後述)。
  • schema_update_options
    • BigQueryのJobパラメータに schemaUpdateOptions というのがあり、side effectとして、カラムの追加をしてくれます( ALLOW_FIELD_ADDITION は NULLABLEのカラムを追加します。ほか詳細は公式ドキュメントへ)
    • schemaUpdateOptions は残念ながらCopy Jobでは使えないので、 テンポラリテーブルをcopyする mode: replace では使えず、 mode: delete_in_advance にしています。
    • このサンプルで、元ファイル(input)にカラムが増えた場合は、outputの columns: に、カラムを追記すれば、BigQuery側で別途スキーマ追加の作業なしに、Embulkでのデータ取り込み時にカラム追加してくれます。
  • stop_on_invalid_record
    • invalidなレコード(typeが違うとか)の場合に、該当レコードをskipさせずに処理全体を止めるパラメータです(デフォルトはfalseでskipしてしまう)
    • データの不整合の原因になるので、あえて明示的にtrueにしています。

こんな感じでやってます。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?