Edited at

DigdagとEmbulkの連携で便利なdigdagの変数参照

More than 1 year has passed since last update.

Digdagのembulkオペレータは、Digdagで定義した参照をEmbulkの設定ファイルないで利用することができます。

下記の例は、my_pathという変数をDigdagで定義して、Embulkの設定で参照する例です。

この方法を利用することで、RubyやPythonで定義した変数をEmbulk内で利用することができるようになります。

Embulkオペレータとshオペレータそれぞれを利用することができます。


Embulkオペレータの場合

注意: Digdagのembulkオペレータは現状Liquidの機能は使えません。


ディレクトリ構成

/tmp/hoge

|
|-- config.yml # <-- Embulk設定ファイル
|-- csv
| `-- sample_01.csv.gz
|-- hoge.dig # <-- Digdagファイル


設定例

hoge.dig

timezone: UTC

_export:
my_path: "/tmp/hoge/csv"

+step1:
embulk>: ./config.yml

config.yaml

Embulkの設定は次のとおりです。path_prefix: ${my_path}のがdigdagで定義した変数を参照している部分です。

in:

type: file
path_prefix: ${my_path} # <-- これ
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
null_string: 'NULL'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out: {type: stdout}


実行例

実際に実行した結果は次のようになります。

digdag run hoge --rerun

2016-06-29 17:30:22 +0900: Digdag v0.8.2
2016-06-29 17:30:24 +0900 [WARN] (main): Reusing the last session time 2016-06-29T00:00:00+00:00.
2016-06-29 17:30:24 +0900 [INFO] (main): Using session .digdag/status/20160629T000000+0000.
2016-06-29 17:30:24 +0900 [INFO] (main): Starting a new session project id=1 workflow name=hoge session_time=2016-06-29T00:00:00+00:00
2016-06-29 17:30:24 +0900 [INFO] (0018@+hoge+step1): embulk>: ./config.yml
2016-06-29 17:30:29.404 +0900: Embulk v0.8.9
2016-06-29 17:30:31.673 +0900 [INFO] (0001:transaction): Listing local files at directory '/tmp/hoge/csv' filtering filename by prefix ''
2016-06-29 17:30:31.681 +0900 [INFO] (0001:transaction): Loading files [/tmp/hoge/csv/sample_01.csv.gz]


shオペレータの場合

shオペレータ利用する際、digdagは変数を環境変数に設定します。そのためEmbulkのLiquidの機能を使って変数を利用することができます。

注意: Digdag serverで、Liquidの{% include %}を使うことはできません(Local modeの場合は大丈夫です) 参考


ディレクトリ構成

Embulkの設定ファイル名は必ずliquidという拡張子をつけてください。

/tmp/hoge

|
|-- config.yml.liquid # <-- Embulk設定ファイル
|-- csv
| `-- sample_01.csv.gz
|-- hoge.dig # <-- Digdagファイル


設定例

timezone: UTC

_export:
my_path: "/tmp/hoge/csv"

+step1:
sh>: embulk run ./config.yml.liquid

config.yml.liquidの中身は次のとおりです。

in:

type: file
path_prefix: {{env.my_path}} # <-- 設定
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
null_string: 'NULL'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out: {type: stdout}


secretsの内容を環境変数にセットする


secret_test.dig

+test:

sh>: env
_env:
MAIL_PASS: ${secret:mail.password}

参考: Digdagでpython / ruby operator から Secret parameterを参照する


参考