_env: に変数で値代入するとembulkで思ってたのと違う値になった
+send_user_summary:
loop>: 2
_do:
_export:
# Rubyで作る集計データの出力先パス
output_filepath: "/xxx/yyy/file_${i}.csv"
+summary:
require: 'tasks/user_summary'
rb>: UserSummary.collect
+embulk:
_env:
INPUT_FILEPATH: ${output_filepath} <--- パス間違いを防ぐため変数の内容を環境変数に代入
sh>: embulk run ./embulk/send_bq.yml.liquid
embulk/send_bq.yml.liquid
in:
type: file
path_prefix: {{env.INPUT_FILEPATH}}
parser:
type: csv
# 以下省略
out:
type: bigquery
# 以下焼灼
2019-06-07 13:57:01.374 +0900 [INFO] (2600@[0:sample]+daily-1+send_user_summary^sub+loop-0+embulk) io.digdag.core.agent.OperatorManager: sh>: embulk run ./embulk/send_bq.yml.liquid
2019-06-07 13:57:01.708 +0900: Embulk v0.9.17
2019-06-07 13:57:02.298 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2019-06-07 13:57:04.363 +0900 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2019-06-07 13:57:06.349 +0900 [INFO] (main): Started Embulk v0.9.17
2019-06-07 13:57:09.493 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 13:57:09.530 +0900 [INFO] (0001:transaction): Listing local files at directory '/home/digdag/tmp_file' filtering filename by prefix 'file_${i}.csv'
2019-06-07 13:57:09.531 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2019-06-07 13:57:09.534 +0900 [INFO] (0001:transaction): Loading files []
2019-06-07 13:57:09.574 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=12 / tasks=0
// 中略
2019-06-07 13:57:23.354 +0900 [INFO] (main): Committed.
2019-06-07 13:57:23.354 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}
-
Listing local files at directory '/xxx/yyy' filtering filename by prefix 'file_${i}.csv'
プレースホルダがそのままファイルパスに使われる - 当然
Loading files []
でファイル読み込めてない!
読み込めているときはこうなってた。
+send_user_summary:
loop>: 2
_do:
_export:
# Rubyで作る集計データの出力先パス
output_filepath: "/xxx/yyy/file_${i}.csv"
+summary:
require: 'tasks/user_summary'
rb>: UserSummary.collect
+embulk:
_env:
INPUT_FILEPATH: "/xxx/yyy/file_${i}.csv" <--- 同じ値を書いていた
sh>: embulk run ./embulk/send_bq.yml.liquid
2019-06-07 13:39:52.295 +0900 [INFO] (0001:transaction): Listing local files at directory '/xxx/yyy' filtering filename by prefix 'file_0.csv'
2019-06-07 13:39:52.299 +0900 [INFO] (0001:transaction): Loading files [/xxx/yyy/file_0.csv]
組み合わせを調査
結果 | digdag | 代入方法 | embulkの受け取り方 | 結果 |
---|---|---|---|---|
OK | _env: | "/xxx/yyy/file_${i}.csv" | {{env.INPUT_FILEPATH}} | 'file_0.csv' |
NG | _env: | ${output_filepath} | {{env.INPUT_FILEPATH}} | 'file_${i}.csv' |
OK | _export: | "/xxx/yyy/file_${i}.csv" | {{env.INPUT_FILEPATH}} | 'file_0.csv' |
OK | _export: | ${output_filepath} | {{env.INPUT_FILEPATH}} | 'file_0.csv' |
NG | _export: | "/xxx/yyy/file_${i}.csv" | ${INPUT_FILEPATH} | '${INPUT_FILEPATH}' |
NG | _export: | ${output_filepath} | ${INPUT_FILEPATH} | '${INPUT_FILEPATH}' |
結論
digdag + embulk(yml.liquid)を使う場合は _export
で定義し、{{env.xxxx}}
で受け取るのが良い。
調査時のメモ(見なくてもいいやつ)
■ _env : 全て絶対値で代入 + embulk.liquid {{env}}
=== digdag ===
loop>: 2
_do
_export:
output_filepath: "/xxx/yyy/file_${i}.csv"
+embulk:
_env:
INPUT_FILEPATH: "/xxx/yyy/file_${i}.csv"
sh>: embulk run ./embulk/task.yml.liquid
=== embulk ===
in:
type: file
path_prefix: {{env.INPUT_FILEPATH}}
=== embulk log ===
2019-06-07 12:04:40.642 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 12:04:40.683 +0900 [INFO] (0001:transaction): Listing local files at directory '/xxx/yyy' filtering filename by prefix 'file_0.csv'
■ _env : INPUT_FILEPATH(変数) を使って代入 + embulk.liquid {{env}}
=== digdag ===
loop>: 2
_do
_export:
output_filepath: "/xxx/yyy/file_${i}.csv"
+embulk:
_env:
INPUT_FILEPATH: ${output_filepath}
sh>: embulk run ./embulk/task.yml.liquid
=== embulk ===
in:
type: file
path_prefix: {{env.INPUT_FILEPATH}}
=== embulk log ===
2019-06-07 12:10:40.599 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 12:10:40.661 +0900 [INFO] (0001:transaction): Listing local files at directory '/xxx/yyy' filtering filename by prefix 'file_${i}.csv'
■ _export : 全て絶対値で代入 + embulk.liquid {{env}}
=== digdag ===
loop>: 2
_do
_export:
output_filepath: "/xxx/yyy/file_${i}.csv"
+embulk:
_export:
INPUT_FILEPATH: "/xxx/yyy/file_${i}.csv"
sh>: embulk run ./embulk/task.yml.liquid
=== embulk ===
in:
type: file
path_prefix: {{env.INPUT_FILEPATH}}
=== embulk log ===
2019-06-07 13:32:19.555 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 13:32:19.592 +0900 [INFO] (0001:transaction): Listing local files at directory '/xxx/yyy' filtering filename by prefix 'file_0.csv'
■ _export : INPUT_FILEPATH(変数) を使って代入 + embulk.liquid {{env}}
=== digdag ===
loop>: 2
_do
_export:
output_filepath: "/xxx/yyy/file_${i}.csv"
+embulk:
_export:
INPUT_FILEPATH: ${output_filepath}
sh>: embulk run ./embulk/task.yml.liquid
=== embulk ===
in:
type: file
path_prefix: {{env.INPUT_FILEPATH}}
=== embulk log ===
2019-06-07 13:39:52.247 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 13:39:52.295 +0900 [INFO] (0001:transaction): Listing local files at directory '/xxx/yyy' filtering filename by prefix 'file_0.csv'
■ _export : 全て絶対値で代入 + embulk.liquid ${val}
=== digdag ===
loop>: 2
_do
_export:
output_filepath: "/xxx/yyy/file_${i}.csv"
+embulk:
_export:
INPUT_FILEPATH: "/xxx/yyy/file_${i}.csv"
sh>: embulk run ./embulk/task.yml.liquid
=== embulk ===
in:
type: file
path_prefix: ${INPUT_FILEPATH}
=== embulk log ===
2019-06-07 12:19:28.619 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 12:19:28.646 +0900 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix '${INPUT_FILEPATH}'
■ _export : INPUT_FILEPATH(変数) を使って代入 + embulk.liquid ${val}
=== digdag ===
loop>: 2
_do
_export:
output_filepath: "/xxx/yyy/file_${i}.csv"
+embulk:
_export:
INPUT_FILEPATH: ${output_filepath}
sh>: embulk run ./embulk/task.yml.liquid
=== embulk ===
in:
type: file
path_prefix: ${INPUT_FILEPATH}
=== embulk log ===
2019-06-07 12:22:23.493 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 12:22:23.517 +0900 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix '${INPUT_FILEPATH}'
=============================================================================================
■ _env : 全て絶対値で代入 + embulk.yml {{env}}
=== digdag ===
loop>: 2
_do
_export:
output_filepath: "/xxx/yyy/file_${i}.csv"
+embulk:
_env:
INPUT_FILEPATH: "/xxx/yyy/file_${i}.csv"
sh>: embulk run ./embulk/task.yml
=== embulk ===
in:
type: file
path_prefix: {{env.INPUT_FILEPATH}}
=== embulk log ===
2019-06-07 12:29:00.094 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
org.embulk.exec.PartialExecutionException: org.embulk.config.ConfigException: java.lang.IllegalArgumentException: Can not deserialize instance of java.lang.String out of START_OBJECT token
■ _env : INPUT_FILEPATH(変数) を使って代入 + embulk.yml {{env}}
=== digdag ===
loop>: 2
_do
_export:
output_filepath: "/xxx/yyy/file_${i}.csv"
+embulk:
_env:
INPUT_FILEPATH: ${output_filepath}
sh>: embulk run ./embulk/task.yml
=== embulk ===
in:
type: file
path_prefix: {{env.INPUT_FILEPATH}}
=== embulk log ===
2019-06-07 12:30:49.981 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
org.embulk.exec.PartialExecutionException: org.embulk.config.ConfigException: java.lang.IllegalArgumentException: Can not deserialize instance of java.lang.String out of START_OBJECT token
■ _export : 全て絶対値で代入 + embulk.yml ${val}
=== digdag ===
loop>: 2
_do
_export:
output_filepath: "/xxx/yyy/file_${i}.csv"
+embulk:
_export:
INPUT_FILEPATH: "/xxx/yyy/file_${i}.csv"
sh>: embulk run ./embulk/task.yml
=== embulk ===
in:
type: file
path_prefix: ${INPUT_FILEPATH}
=== embulk log ===
2019-06-07 12:32:51.345 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 12:32:51.374 +0900 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix '${INPUT_FILEPATH}'
■ _export : INPUT_FILEPATH(変数) を使って代入 + embulk.yml ${val}
=== digdag ===
loop>: 2
_do
_export:
output_filepath: "/xxx/yyy/file_${i}.csv"
+embulk:
_export:
INPUT_FILEPATH: ${output_filepath}
sh>: embulk run ./embulk/task.yml
=== embulk ===
in:
type: file
path_prefix: ${INPUT_FILEPATH}
=== embulk log ===
2019-06-07 13:17:18.398 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 13:17:18.428 +0900 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix '${INPUT_FILEPATH}'