1
2

More than 1 year has passed since last update.

Digdagワークフローでフロー内変数を使う上でハマったこと

Posted at

はじめに

今回はプライベートの技術ではなく、仕事で使ってるものでの内容。とある業務にてワークフロー・ジョブシステムとしてDigdagを採用してシステム構築している。
初めて使うソフトなので慣れていないせいもあるが、検索してヒットするDigdag関連の記事には今回の内容に触れている記事が見当たらなかったため、忘備録として残しておく。

実現したいこと

メインとなるPythonスクリプトにて、エラーが発生したらその内容を捕捉し、digdag.env.storeにてフロー内変数に格納する。
_errorパラメータにて別Pythonを呼び出し、格納したフロー内変数を参照し、エラー内容をログにまとめて書いたりGmailAPIを使ってエラー通知などをしたかった。

フローとスクリプト

Digdag自体の使い方は別の方々の記事に譲るとして、ここでは問題点のみ挙げる。実装したフローとスクリプトは簡単にするとだいたいこのような内容である。

test01.dig
_export:
  not_found_file: False
  errmsg: " "

+test01:
  py>: hoge.foo.script01

_error:
  +err01:
    echo>: "error です"
  +err02:
    py>: hoge.foo.sendgmail 
test02.dig
_export:
  not_found_file: False
  errmsg: " "

+test01:
  py>: hoge.foo.script02
  chkpath: "D:\\myfile\\data.csv"

+chk01:
  if>: ${not_found_file}
    _do:
      fail>: "JOB [test02] don't find data.csv !!"

_error:
  +err01:
    echo>: "error です"
  +err02:
    py>: hoge.foo.sendgmail 
foo.py
def script01():
    try:
        #なにか処理
    except Exception as e:
        digdag.env.store({"not_found_file",True})
        digdag.env.store({"errmsg":"script01でエラー"})
        raise Exception(e)

def script02(chkpath):
    not_found_file = False
    if os.path.exists(chkpath) == False:
        not_found_file = True
        digdag.env.store({"errmsg":"script02でエラー"})
    digdag.env.store({"not_found_file",not_found_file})

def script03():
    try:
        #なにか処理
        sys.exit(0)
    except Exception as e:
        digdag.env.store({"not_found_file",True})
        digdag.env.store({"errmsg":"script03でエラー"})
        sys.exit(1)

def sendgmail():
    print(digdag.env.params)
    errmsg = digdag.env.params["errmsg"]
    #Gmail APIを呼び出す処理(例)
    gmailcall(to, subject, body+"\n"+errmsg) 

現象

上記のようなフローとスクリプトがあったとする。

いずれの例も、最終的にはエラー時にそれまでのスクリプトで補足したエラー内容をエラー通知の処理に受け渡したいものとする。

このfoo.pyの中で、フロー内の変数に正常に値が変化してセットされるものとされないものがある。

変数に正常に値が変化してセットされるケース

  • foo.script02

この処理では、パスの存在チェックをし、フロー内変数 not_found_file にTrue/Falseをセットして終えている。
そしてフローの次のタスク +chk01: では not_found_file を判定し、真だったら fail>: オペレータでエラーハンドリングをしている。

Python内でのエラー判定はあくまでも bool型にセットしているのみである。エラーの実際のハンドリングはフローにまかせている。

当然、 not_found_file にTrue/Falseがセットされる前提なので、そもそもできないと話にならない。
chkpath が示すCSVファイルを置いたり削除してワークフローを実行した結果、それぞれの動作が期待できる。

変数に値をセットしても変化しない・あるいは初期値に戻ってしまうケース

  • foo.script01
  • foo.script03

この2つはPythonの中でPythonの流儀に従ってエラーを補足し、ワークフローにて最終的なハンドリングをしてしかるべきエラー後の処理に受け渡すことを期待したものである。
Digdagでは、Pythonにてraiseで例外を発生させたり、 sys.exitで0以外を受け渡すことにより、 ワークフロー側で _error:パラメータと連携できるようになっている。

なっているのだが、落とし穴に気づいた。

script01script03も、エラーが発生して例外処理に最初に行ったあと、 errmsg 変数にエラーメッセージをセットしようとしている。
この errmsg自体は .dig内で定義してもしていなくてもどちらでもよい。

それぞれの処理内でエラーが発生し、例外処理に入ってエラーの後始末が行われ、raiseあるいはsys.exit(1)によってスクリプトのしょりが終了する。

そして ワークフローに戻り、 _error: パラメータに入る。その中で期待している sendgmailでエラー通知なりログ出力をしようとしている。

エラーハンドリング後に起こること

いざエラー通知・・・と、その前にデバッグ目的でdigdag.env.paramsを全部表示して確認する。すると各スクリプトを呼び出したそれぞれのワークフローで何がおきるかというと、こうなる。

#foo.script01を呼び出したフロー, foo.script03を呼び出したフロー
digdag.env.params = {"not_found_file":False, "errmsg": " ", ...}
#foo.script02を呼び出したフロー
digdag.env.params = {"not_found_file":True, "errmsg": "script02でエラー", ...}

いずれの処理も、エラーを期待したときに not_found_file を True、errmsg にメッセージをセットした。本来ならば すべての例で、True, エラーメッセージがセットされている のを確認したかった。
ところが、raiseで例外を呼び出した処理、sys.exit(1)でプログラムの戻り値を正常終了以外としたケースの処理では、 なんとフロー内変数が初期値のままなのである

そのため、 script01script03 では、メール通知文にはエラーメッセージはセットされないで配信される。

仮に errmsg を .dig内で定義せず、Python側でのみ定義・セットしていたとしたら、 _error: では errmsg がないという怒られ方をしてエラー内でエラーがでるというよくわからない事態に陥る。

どうも、末端のスクリプト達で本当にエラーという情報をフローに返されると、そこでフローの処理や変数のスコープが途切れるのか、 _error: 時点ではユーザーが定義した変数は使えないということなのかもしれない。

リカバリーの面

今回問題にする部分を無視・除外するとして、いざリカバリーをする段になったときのそれぞれの問題点。

script01script03 はエラー時点からのretryをすれば、エラーの原因が解消されていればそのままPythonが再実行されて解消される。

script02fail>: でエラーを故意に発生させてのエラーハンドリングなので、そのまま再実行しても再び fail>: からの再実行となるため、何度やってもリカバリーはできない。

 そのあたり、フローの組み方が非常に厄介である。

別の問題

今回の発端の一つでもあるが、digdag.env.paramsにかな漢字などマルチバイトの文字をセットしていると、
UnicodeDecodeErrorが発生する。

Traceback (most recent call last):
  File "C:\hoge\digdag\test01\.digdag\tmp\digdag-py-17-3253474438201947048\runner.py", line 14, in <module>
    in_data = json.load(f)
  File "C:\ProgramData\Anaconda3\envs\py310\lib\json\__init__.py", line 293, in load
    return loads(fp.read(),
UnicodeDecodeError: 'cp932' codec can't decode byte 0x85 in position 828: illegal multibyte sequence

自分の書いたPythonならまだ直しようはあるが、Digdagに組み込まれたPythonではいちいち直しようがないので非常に困る・・・。

Digdagは小規模なシステムにも組み込みやすくて手軽で安定していて良いのだが、日本語環境(マルチバイト)では厳しいのが難点か。

おわりに

以上、かなり個人的・局地的な内容を挙げていったが、Digdagは有償のジョブ・ワークフローソフトも合わせて検討した中で、一番導入が楽で、小回りも効いて業務にフィットさせやすいと感じたソフトだと思っている。

なにせjavaでできたファイル1個(batでパッケージングされているが)で動くのだ。そのままローカルで単独で使ってもよし、PostgreSQLと連携してDigdagサーバを起動して常駐して使ってもよし。
タスクスケジューラやcronでは一歩届かなかったスケジュール周りが .digファイルという見た目でわかりやすく設定できるのがポイント。

とにかく扱いやすい。もっと広まればいいのにと思い、自分の今の仕事場では自分が率先して提案して、導入した。

ただ、Pythonに引っ張られて文字コード周りはとにかく弱い。今回のように末端のスクリプトで不意にマルチバイト文字をフロー内に持っていかれると非常に厄介である。
なんとかDigdag公式で改良してほしい気もするし、自分たちの設計や運用でなんとかしなければという思いもあり、悩みどころである。

それに他のソフトと違って GUIなソフト部分は WebUI一本となかなか寂しい。(機能的には最低限)
さがせばきっと便利にしてくれるDigdag用のGUIソフトはあるのだろう。とはいえREST APIが揃っているので、Webアプリは作りやすいはず。

というわけで自分のところでも独自に専用のGUIアプリを作って皆でなんとかわかりやすく使えるようにしようと試行錯誤中である。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2