Digdagとは
ワークフローエンジンと呼ばれるもので、データ分析基盤を構築する際に、Shell ScriptでPythonバッチを順に流しているような場合に、実行順序をyamlで定義できます。
serverモードというものがあって、複数ホストによる分散コンピューティングもできるので、場合によってはCeleryを導入しなくても、すべてDigdagで済ますこともできるのではないかと思い、調査を始めました。
ハマりポイント
Language API - Python を使うに当たって、Pythonエンジニアが事前に知っておいた方がよいこと。
1.digdagのPythonパッケージはどこで配布されているか
次のサンプルを見てみましょう。
import digdag
class MyWorkflow(object):
def step1(self):
digdag.env.store({'my_value': 1})
def step2(self):
print("step2: %s" % digdag.env.params["my_value"])
普通に考えると pip install digdag
が必要かと思うのですが、PyPIには登録されていません。
実は、Digdag配下で動いている時のみ、import digdag
できるのです。
それがわからなくて(何ができるのか、直接ソースコードが見たかったのもあって)、3時間ぐらい情報を探しまくりました。
悔しかったので、どこでやっているのか探してみました。
実態は、ここのようです。
再配布の必要がないので、まあ賢いやり方ですが、戸惑いました。
2.APIドキュメントにすべて記載されていない。
Language API - Python のAPIドキュメントは、やけにあっさりしています。
が、ここに書かれていること以上のことが実際にはできるので、機能不足で諦めてしまう前に、examples の *.py
を一読してみることをお薦めします。
引数の渡し方のバリエーション
APIドキュメントを見て、もっと自由にPythonに引数を渡せないのかと思いましたが、アンドキュメントではあるものの、examplesにはありました。
examples/python_args.dig
timezone: UTC
+a:
py>: examples.python_args.required_arguments
required1: "1"
required2: 2
+b:
py>: examples.python_args.optional_arguments
+c:
py>: examples.python_args.mixed_arguments
arg1: "a"
arg2: {b: "c"}
+d:
py>: examples.python_args.keyword_arguments
arg1: "a"
key1: "a"
key2: "val2"
examples/python_args.py
from __future__ import print_function
import digdag
def required_arguments(required1, required2):
print("required1 = " + str(required1))
print("required2 = " + str(required2))
def optional_arguments(optional1=None, optional2="default"):
print("optional1 = " + str(optional1))
print("optional2 = " + str(optional2))
def mixed_arguments(arg1, arg2=None, arg3=None):
print("arg1 = " + str(arg1))
print("arg2 = " + str(arg2))
print("arg3 = " + str(arg3))
def keyword_arguments(arg1, **kw):
print("arg1 = " + str(arg1))
print("keywords = " + str(kw))
動的なワークフロー
静的なワークフローの場合はよいのですが、動的なワークフローの場合はどうでしょうか。
簡単な例では、APIドキュメントにあるように、追加のPythonタスクをその場で実行できます。
workflow.dig:
+step1:
py>: tasks.MyWorkflow.step1
+step2:
py>: tasks.MyWorkflow.step2
Generating Python child tasks:
import digdag
class MyWorkflow(object):
def step1(self):
digdag.env.add_subtask(MyWorkflow.step3, arg1=1)
def step2(self, my_value="default"):
print("step2: %s" % my_value)
def step3(self, arg1):
print("step3: %s" % arg1)
一番やりたかったのが、あるタスクの結果により、後続のタスクに別々の引数を渡し、かつ並列で多重度も可変で実行したい場合です。
最初はAPIドキュメントの情報だけで、色々トリッキーなことを考えていたのですが、examplesを眺めていると、そのものズバリでスマートな方法がありました。
examples/generate_subtasks.dig
timezone: UTC
+split:
py>: examples.generate_subtasks.ParallelProcess.split
+parallel_process:
py>: examples.generate_subtasks.ParallelProcess.run
examples/generate_subtasks.py
from __future__ import print_function
import digdag
class ParallelProcess(object):
def split(self):
digdag.env.store({"task_count": 3})
def run(self, task_count):
for i in range(task_count):
digdag.env.add_subtask(ParallelProcess.subtask, index=i)
digdag.env.subtask_config["_parallel"] = True
def subtask(self, index):
print("Processing " + str(index))
うーん、簡潔で美しい。ちゃんとそういうニーズにも答えているのですね。
工夫によっては、Digdagだけでバッチサーバーの分散処理にも応用できるのではないか、という気がしてきました。
もう少し検証してみたいと思います。
追記:実機検証
キューとなる postgres 1台 + Digdag server 2台の環境を作って、検証してみました。
結果は、上記タスクの
- +split は、Digdag server 1台目で走る
- +parallel_process は、Digdag server 2台目で走る。subtask 3つも、2台目で走る。
という感じでした。(逆もありえます。)
各タスクは分散されますが、サブタスクは分散されないようです。
要件が動的な並列処理の分散だとすると、その部分はマッチしないように感じました。