Digdagのembulkオペレータは、Digdagで定義した参照をEmbulkの設定ファイルないで利用することができます。
下記の例は、my_path
という変数をDigdagで定義して、Embulkの設定で参照する例です。
この方法を利用することで、RubyやPythonで定義した変数をEmbulk内で利用することができるようになります。
Embulkオペレータとshオペレータそれぞれを利用することができます。
Embulkオペレータの場合
注意: Digdagのembulkオペレータは現状Liquidの機能は使えません。
ディレクトリ構成
/tmp/hoge
|
|-- config.yml # <-- Embulk設定ファイル
|-- csv
| `-- sample_01.csv.gz
|-- hoge.dig # <-- Digdagファイル
設定例
hoge.dig
timezone: UTC
_export:
my_path: "/tmp/hoge/csv"
+step1:
embulk>: ./config.yml
config.yaml
Embulkの設定は次のとおりです。path_prefix: ${my_path}
のがdigdagで定義した変数を参照している部分です。
in:
type: file
path_prefix: ${my_path} # <-- これ
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
null_string: 'NULL'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out: {type: stdout}
実行例
実際に実行した結果は次のようになります。
digdag run hoge --rerun
2016-06-29 17:30:22 +0900: Digdag v0.8.2
2016-06-29 17:30:24 +0900 [WARN] (main): Reusing the last session time 2016-06-29T00:00:00+00:00.
2016-06-29 17:30:24 +0900 [INFO] (main): Using session .digdag/status/20160629T000000+0000.
2016-06-29 17:30:24 +0900 [INFO] (main): Starting a new session project id=1 workflow name=hoge session_time=2016-06-29T00:00:00+00:00
2016-06-29 17:30:24 +0900 [INFO] (0018@+hoge+step1): embulk>: ./config.yml
2016-06-29 17:30:29.404 +0900: Embulk v0.8.9
2016-06-29 17:30:31.673 +0900 [INFO] (0001:transaction): Listing local files at directory '/tmp/hoge/csv' filtering filename by prefix ''
2016-06-29 17:30:31.681 +0900 [INFO] (0001:transaction): Loading files [/tmp/hoge/csv/sample_01.csv.gz]
shオペレータの場合
shオペレータ利用する際、digdagは変数を環境変数に設定します。そのためEmbulkのLiquidの機能を使って変数を利用することができます。
注意: Digdag serverで、Liquidの{% include %}
を使うことはできません(Local modeの場合は大丈夫です) 参考
ディレクトリ構成
Embulkの設定ファイル名は必ずliquidという拡張子をつけてください。
/tmp/hoge
|
|-- config.yml.liquid # <-- Embulk設定ファイル
|-- csv
| `-- sample_01.csv.gz
|-- hoge.dig # <-- Digdagファイル
設定例
timezone: UTC
_export:
my_path: "/tmp/hoge/csv"
+step1:
sh>: embulk run ./config.yml.liquid
config.yml.liquidの中身は次のとおりです。
in:
type: file
path_prefix: {{env.my_path}} # <-- 設定
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
null_string: 'NULL'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out: {type: stdout}
secretsの内容を環境変数にセットする
+test:
sh>: env
_env:
MAIL_PASS: ${secret:mail.password}
参考: Digdagでpython / ruby operator から Secret parameterを参照する