Python
digdag

DigdagのPython APIを使う

logo-digdag-rec-tr.png

Digdagとは

ワークフローエンジンと呼ばれるもので、データ分析基盤を構築する際に、Shell ScriptでPythonバッチを順に流しているような場合に、実行順序をyamlで定義できます。

serverモードというものがあって、複数ホストによる分散コンピューティングもできるので、場合によってはCeleryを導入しなくても、すべてDigdagで済ますこともできるのではないかと思い、調査を始めました。

ハマりポイント

Language API - Python を使うに当たって、Pythonエンジニアが事前に知っておいた方がよいこと。

1.digdagのPythonパッケージはどこで配布されているか

次のサンプルを見てみましょう。

import digdag
class MyWorkflow(object):
  def step1(self):
    digdag.env.store({'my_value': 1})

  def step2(self):
    print("step2: %s" % digdag.env.params["my_value"])

普通に考えると pip install digdag が必要かと思うのですが、PyPIには登録されていません。

実は、Digdag配下で動いている時のみ、import digdagできるのです。
それがわからなくて(何ができるのか、直接ソースコードが見たかったのもあって)、3時間ぐらい情報を探しまくりました。

悔しかったので、どこでやっているのか探してみました。
実態は、ここのようです。

再配布の必要がないので、まあ賢いやり方ですが、戸惑いました。

2.APIドキュメントにすべて記載されていない。

Language API - Python のAPIドキュメントは、やけにあっさりしています。

が、ここに書かれていること以上のことが実際にはできるので、機能不足で諦めてしまう前に、examples*.py を一読してみることをお薦めします。

引数の渡し方のバリエーション

APIドキュメントを見て、もっと自由にPythonに引数を渡せないのかと思いましたが、アンドキュメントではあるものの、examplesにはありました。

examples/python_args.dig

timezone: UTC

+a:
  py>: examples.python_args.required_arguments
  required1: "1"
  required2: 2

+b:
  py>: examples.python_args.optional_arguments

+c:
  py>: examples.python_args.mixed_arguments
  arg1: "a"
  arg2: {b: "c"}

+d:
  py>: examples.python_args.keyword_arguments
  arg1: "a"
  key1: "a"
  key2: "val2"

examples/python_args.py

from __future__ import print_function
import digdag

def required_arguments(required1, required2):
    print("required1 = " + str(required1))
    print("required2 = " + str(required2))

def optional_arguments(optional1=None, optional2="default"):
    print("optional1 = " + str(optional1))
    print("optional2 = " + str(optional2))

def mixed_arguments(arg1, arg2=None, arg3=None):
    print("arg1 = " + str(arg1))
    print("arg2 = " + str(arg2))
    print("arg3 = " + str(arg3))

def keyword_arguments(arg1, **kw):
    print("arg1 = " + str(arg1))
    print("keywords = " + str(kw))

動的なワークフロー

静的なワークフローの場合はよいのですが、動的なワークフローの場合はどうでしょうか。
簡単な例では、APIドキュメントにあるように、追加のPythonタスクをその場で実行できます。

workflow.dig:

+step1:
  py>: tasks.MyWorkflow.step1

+step2:
  py>: tasks.MyWorkflow.step2

Generating Python child tasks:

import digdag
class MyWorkflow(object):
  def step1(self):
    digdag.env.add_subtask(MyWorkflow.step3, arg1=1)

  def step2(self, my_value="default"):
    print("step2: %s" % my_value)

  def step3(self, arg1):
    print("step3: %s" % arg1)

一番やりたかったのが、あるタスクの結果により、後続のタスクに別々の引数を渡し、かつ並列で多重度も可変で実行したい場合です。

最初はAPIドキュメントの情報だけで、色々トリッキーなことを考えていたのですが、examplesを眺めていると、そのものズバリでスマートな方法がありました。

examples/generate_subtasks.dig

timezone: UTC

+split:
  py>: examples.generate_subtasks.ParallelProcess.split

+parallel_process:
  py>: examples.generate_subtasks.ParallelProcess.run

examples/generate_subtasks.py

from __future__ import print_function
import digdag

class ParallelProcess(object):
    def split(self):
        digdag.env.store({"task_count": 3})

    def run(self, task_count):
        for i in range(task_count):
            digdag.env.add_subtask(ParallelProcess.subtask, index=i)
        digdag.env.subtask_config["_parallel"] = True

    def subtask(self, index):
        print("Processing " + str(index))

うーん、簡潔で美しい。ちゃんとそういうニーズにも答えているのですね。

工夫によっては、Digdagだけでバッチサーバーの分散処理にも応用できるのではないか、という気がしてきました。
もう少し検証してみたいと思います。

追記:実機検証

キューとなる postgres 1台 + Digdag server 2台の環境を作って、検証してみました。

結果は、上記タスクの

  • +split は、Digdag server 1台目で走る
  • +parallel_process は、Digdag server 2台目で走る。subtask 3つも、2台目で走る。

という感じでした。(逆もありえます。)
各タスクは分散されますが、サブタスクは分散されないようです。
要件が動的な並列処理の分散だとすると、その部分はマッチしないように感じました。