前提
- outputには、embulk/embulk-output-bigqueryを利用します。
- ここに書いてあることはだいたいREADMEに書いています。
- 取り込み先はパーティショニングテーブル(
timePartitioning:DAY
) - データ取り込みは日時で行い、日付パーティショニングするものとします。
- embulk 0.9.17でテストした。
サンプル(.yml.liquid)
- input部分については、本記事では触れません
in:
type: gcs
project: foobar_project
bucket: foobar_bucket
path_prefix: /path/to/{{ env.EXECUTE_DATE }}/{{ env.LOGNAME }}.csv.gz
auth_method: compute_engine
application_name: foobar_app
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
skip_header_lines: 1
stop_on_invalid_record: true
out:
type: bigquery
project: foobar_project
mode: delete_in_advance
auth_method: compute_engine
location: asia-northeast1
auto_create_table: true
time_partitioning:
type: 'DAY'
require_partition_filter: true
dataset: foobar_dataset
table: foobar_table${{ env.EXECUTE_DATE }}
compression: GZIP
source_format: CSV
schema_update_options: ['ALLOW_FIELD_ADDITION']
stop_on_invalid_record: true
columns:
- {name: c0, type: string}
- {name: c1, type: long}
- {name: c2, type: string}
Output部分の解説
- mode
- (追記ではなく)冪等を実現するためには、modeは
replace
replace_backup
delete_in_advance
などがあります。テンポラリテーブルを利用している、replace
は早いのですが、勝手にカラム追加してほしいので、delete_in_advance
を使っています(後述)。
- (追記ではなく)冪等を実現するためには、modeは
- schema_update_options
- BigQueryのJobパラメータに
schemaUpdateOptions
というのがあり、side effectとして、カラムの追加をしてくれます(ALLOW_FIELD_ADDITION
は NULLABLEのカラムを追加します。ほか詳細は公式ドキュメントへ) -
schemaUpdateOptions
は残念ながらCopy Jobでは使えないので、 テンポラリテーブルをcopyするmode: replace
では使えず、mode: delete_in_advance
にしています。 - このサンプルで、元ファイル(input)にカラムが増えた場合は、outputの
columns:
に、カラムを追記すれば、BigQuery側で別途スキーマ追加の作業なしに、Embulkでのデータ取り込み時にカラム追加してくれます。
- BigQueryのJobパラメータに
- stop_on_invalid_record
- invalidなレコード(typeが違うとか)の場合に、該当レコードをskipさせずに処理全体を止めるパラメータです(デフォルトはfalseでskipしてしまう)
- データの不整合の原因になるので、あえて明示的にtrueにしています。
こんな感じでやってます。