3
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

digdagからembulk(.yml.liquid)へのパラメータ渡し方

Last updated at Posted at 2019-06-07

_env: に変数で値代入するとembulkで思ってたのと違う値になった

+send_user_summary:
  loop>: 2
  _do:
    _export:
      # Rubyで作る集計データの出力先パス
      output_filepath: "/xxx/yyy/file_${i}.csv"

    +summary:
      require: 'tasks/user_summary'
      rb>: UserSummary.collect

    +embulk:
      _env:
        INPUT_FILEPATH: ${output_filepath} <--- パス間違いを防ぐため変数の内容を環境変数に代入
      sh>: embulk run ./embulk/send_bq.yml.liquid
embulk/send_bq.yml.liquid
in:
  type:  file
  path_prefix: {{env.INPUT_FILEPATH}}
  parser:
    type:  csv
    # 以下省略
out:
  type: bigquery
  # 以下焼灼
2019-06-07 13:57:01.374 +0900 [INFO] (2600@[0:sample]+daily-1+send_user_summary^sub+loop-0+embulk) io.digdag.core.agent.OperatorManager: sh>: embulk run ./embulk/send_bq.yml.liquid
2019-06-07 13:57:01.708 +0900: Embulk v0.9.17
2019-06-07 13:57:02.298 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2019-06-07 13:57:04.363 +0900 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2019-06-07 13:57:06.349 +0900 [INFO] (main): Started Embulk v0.9.17
2019-06-07 13:57:09.493 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 13:57:09.530 +0900 [INFO] (0001:transaction): Listing local files at directory '/home/digdag/tmp_file' filtering filename by prefix 'file_${i}.csv'
2019-06-07 13:57:09.531 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2019-06-07 13:57:09.534 +0900 [INFO] (0001:transaction): Loading files []
2019-06-07 13:57:09.574 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=12 / tasks=0
// 中略
2019-06-07 13:57:23.354 +0900 [INFO] (main): Committed.
2019-06-07 13:57:23.354 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}
  • Listing local files at directory '/xxx/yyy' filtering filename by prefix 'file_${i}.csv' プレースホルダがそのままファイルパスに使われる
  • 当然 Loading files [] でファイル読み込めてない!

読み込めているときはこうなってた。

+send_user_summary:
  loop>: 2
  _do:
    _export:
      # Rubyで作る集計データの出力先パス
      output_filepath: "/xxx/yyy/file_${i}.csv"

    +summary:
      require: 'tasks/user_summary'
      rb>: UserSummary.collect

    +embulk:
      _env:
        INPUT_FILEPATH: "/xxx/yyy/file_${i}.csv" <--- 同じ値を書いていた
      sh>: embulk run ./embulk/send_bq.yml.liquid
2019-06-07 13:39:52.295 +0900 [INFO] (0001:transaction): Listing local files at directory '/xxx/yyy' filtering filename by prefix 'file_0.csv'
2019-06-07 13:39:52.299 +0900 [INFO] (0001:transaction): Loading files [/xxx/yyy/file_0.csv]

組み合わせを調査

結果 digdag 代入方法 embulkの受け取り方 結果
OK _env: "/xxx/yyy/file_${i}.csv" {{env.INPUT_FILEPATH}} 'file_0.csv'
NG _env: ${output_filepath} {{env.INPUT_FILEPATH}} 'file_${i}.csv'
OK _export: "/xxx/yyy/file_${i}.csv" {{env.INPUT_FILEPATH}} 'file_0.csv'
OK _export: ${output_filepath} {{env.INPUT_FILEPATH}} 'file_0.csv'
NG _export: "/xxx/yyy/file_${i}.csv" ${INPUT_FILEPATH} '${INPUT_FILEPATH}'
NG _export: ${output_filepath} ${INPUT_FILEPATH} '${INPUT_FILEPATH}'

結論

digdag + embulk(yml.liquid)を使う場合は _export で定義し、{{env.xxxx}}で受け取るのが良い。

調査時のメモ(見なくてもいいやつ)

■ _env : 全て絶対値で代入 + embulk.liquid {{env}}
=== digdag ===
  loop>: 2
  _do
    _export:
      output_filepath: "/xxx/yyy/file_${i}.csv"
    +embulk:
      _env:
        INPUT_FILEPATH: "/xxx/yyy/file_${i}.csv"
      sh>: embulk run ./embulk/task.yml.liquid
=== embulk ===
in:
  type:  file
  path_prefix: {{env.INPUT_FILEPATH}}
  
=== embulk log ===
2019-06-07 12:04:40.642 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 12:04:40.683 +0900 [INFO] (0001:transaction): Listing local files at directory '/xxx/yyy' filtering filename by prefix 'file_0.csv'


■ _env : INPUT_FILEPATH(変数) を使って代入 + embulk.liquid {{env}}
=== digdag ===
  loop>: 2
  _do
    _export:
      output_filepath: "/xxx/yyy/file_${i}.csv"
    +embulk:
      _env:
        INPUT_FILEPATH: ${output_filepath}
      sh>: embulk run ./embulk/task.yml.liquid
=== embulk ===
in:
  type:  file
  path_prefix: {{env.INPUT_FILEPATH}}
  
=== embulk log ===
2019-06-07 12:10:40.599 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 12:10:40.661 +0900 [INFO] (0001:transaction): Listing local files at directory '/xxx/yyy' filtering filename by prefix 'file_${i}.csv'



■ _export : 全て絶対値で代入 + embulk.liquid {{env}}
=== digdag ===
  loop>: 2
  _do
    _export:
      output_filepath: "/xxx/yyy/file_${i}.csv"
    +embulk:
      _export:
        INPUT_FILEPATH: "/xxx/yyy/file_${i}.csv"
      sh>: embulk run ./embulk/task.yml.liquid
=== embulk ===
in:
  type:  file
  path_prefix: {{env.INPUT_FILEPATH}}
  
=== embulk log ===
2019-06-07 13:32:19.555 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 13:32:19.592 +0900 [INFO] (0001:transaction): Listing local files at directory '/xxx/yyy' filtering filename by prefix 'file_0.csv'



■ _export : INPUT_FILEPATH(変数) を使って代入 + embulk.liquid {{env}}
=== digdag ===
  loop>: 2
  _do
    _export:
      output_filepath: "/xxx/yyy/file_${i}.csv"
    +embulk:
      _export:
        INPUT_FILEPATH: ${output_filepath}
      sh>: embulk run ./embulk/task.yml.liquid
=== embulk ===
in:
  type:  file
  path_prefix: {{env.INPUT_FILEPATH}}
  
=== embulk log ===
2019-06-07 13:39:52.247 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 13:39:52.295 +0900 [INFO] (0001:transaction): Listing local files at directory '/xxx/yyy' filtering filename by prefix 'file_0.csv'


■ _export : 全て絶対値で代入 + embulk.liquid ${val}
=== digdag ===
  loop>: 2
  _do
    _export:
      output_filepath: "/xxx/yyy/file_${i}.csv"
    +embulk:
      _export:
        INPUT_FILEPATH: "/xxx/yyy/file_${i}.csv"
      sh>: embulk run ./embulk/task.yml.liquid
=== embulk ===
in:
  type:  file
  path_prefix: ${INPUT_FILEPATH}
  
=== embulk log ===
2019-06-07 12:19:28.619 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 12:19:28.646 +0900 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix '${INPUT_FILEPATH}'


■ _export : INPUT_FILEPATH(変数) を使って代入 + embulk.liquid ${val}
=== digdag ===
  loop>: 2
  _do
    _export:
      output_filepath: "/xxx/yyy/file_${i}.csv"
    +embulk:
      _export:
        INPUT_FILEPATH: ${output_filepath}
      sh>: embulk run ./embulk/task.yml.liquid
=== embulk ===
in:
  type:  file
  path_prefix: ${INPUT_FILEPATH}
  
=== embulk log ===
2019-06-07 12:22:23.493 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 12:22:23.517 +0900 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix '${INPUT_FILEPATH}'

=============================================================================================

■ _env : 全て絶対値で代入 + embulk.yml {{env}}
=== digdag ===
  loop>: 2
  _do
    _export:
      output_filepath: "/xxx/yyy/file_${i}.csv"
    +embulk:
      _env:
        INPUT_FILEPATH: "/xxx/yyy/file_${i}.csv"
      sh>: embulk run ./embulk/task.yml
=== embulk ===
in:
  type:  file
  path_prefix: {{env.INPUT_FILEPATH}}
  
=== embulk log ===
2019-06-07 12:29:00.094 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
org.embulk.exec.PartialExecutionException: org.embulk.config.ConfigException: java.lang.IllegalArgumentException: Can not deserialize instance of java.lang.String out of START_OBJECT token

■ _env : INPUT_FILEPATH(変数) を使って代入 + embulk.yml {{env}}
=== digdag ===
  loop>: 2
  _do
    _export:
      output_filepath: "/xxx/yyy/file_${i}.csv"
    +embulk:
      _env:
        INPUT_FILEPATH: ${output_filepath}
      sh>: embulk run ./embulk/task.yml
=== embulk ===
in:
  type:  file
  path_prefix: {{env.INPUT_FILEPATH}}
  
=== embulk log ===
2019-06-07 12:30:49.981 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
org.embulk.exec.PartialExecutionException: org.embulk.config.ConfigException: java.lang.IllegalArgumentException: Can not deserialize instance of java.lang.String out of START_OBJECT token


■ _export : 全て絶対値で代入 + embulk.yml ${val}
=== digdag ===
  loop>: 2
  _do
    _export:
      output_filepath: "/xxx/yyy/file_${i}.csv"
    +embulk:
      _export:
        INPUT_FILEPATH: "/xxx/yyy/file_${i}.csv"
      sh>: embulk run ./embulk/task.yml
=== embulk ===
in:
  type:  file
  path_prefix: ${INPUT_FILEPATH}
  
=== embulk log ===
2019-06-07 12:32:51.345 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 12:32:51.374 +0900 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix '${INPUT_FILEPATH}'


■ _export : INPUT_FILEPATH(変数) を使って代入 + embulk.yml ${val}
=== digdag ===
  loop>: 2
  _do
    _export:
      output_filepath: "/xxx/yyy/file_${i}.csv"
    +embulk:
      _export:
        INPUT_FILEPATH: ${output_filepath}
      sh>: embulk run ./embulk/task.yml
=== embulk ===
in:
  type:  file
  path_prefix: ${INPUT_FILEPATH}
  
=== embulk log ===
2019-06-07 13:17:18.398 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-07 13:17:18.428 +0900 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix '${INPUT_FILEPATH}'





3
6
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?