LoginSignup
39
22

More than 5 years have passed since last update.

DigdagのPython APIを使う

Last updated at Posted at 2018-02-01

logo-digdag-rec-tr.png

Digdagとは

ワークフローエンジンと呼ばれるもので、データ分析基盤を構築する際に、Shell ScriptでPythonバッチを順に流しているような場合に、実行順序をyamlで定義できます。

serverモードというものがあって、複数ホストによる分散コンピューティングもできるので、場合によってはCeleryを導入しなくても、すべてDigdagで済ますこともできるのではないかと思い、調査を始めました。

ハマりポイント

Language API - Python を使うに当たって、Pythonエンジニアが事前に知っておいた方がよいこと。

1.digdagのPythonパッケージはどこで配布されているか

次のサンプルを見てみましょう。

import digdag
class MyWorkflow(object):
  def step1(self):
    digdag.env.store({'my_value': 1})

  def step2(self):
    print("step2: %s" % digdag.env.params["my_value"])

普通に考えると pip install digdag が必要かと思うのですが、PyPIには登録されていません。

実は、Digdag配下で動いている時のみ、import digdagできるのです。
それがわからなくて(何ができるのか、直接ソースコードが見たかったのもあって)、3時間ぐらい情報を探しまくりました。

悔しかったので、どこでやっているのか探してみました。
実態は、ここのようです。

再配布の必要がないので、まあ賢いやり方ですが、戸惑いました。

2.APIドキュメントにすべて記載されていない。

Language API - Python のAPIドキュメントは、やけにあっさりしています。

が、ここに書かれていること以上のことが実際にはできるので、機能不足で諦めてしまう前に、examples*.py を一読してみることをお薦めします。

引数の渡し方のバリエーション

APIドキュメントを見て、もっと自由にPythonに引数を渡せないのかと思いましたが、アンドキュメントではあるものの、examplesにはありました。

examples/python_args.dig

timezone: UTC

+a:
  py>: examples.python_args.required_arguments
  required1: "1"
  required2: 2

+b:
  py>: examples.python_args.optional_arguments

+c:
  py>: examples.python_args.mixed_arguments
  arg1: "a"
  arg2: {b: "c"}

+d:
  py>: examples.python_args.keyword_arguments
  arg1: "a"
  key1: "a"
  key2: "val2"

examples/python_args.py

from __future__ import print_function
import digdag

def required_arguments(required1, required2):
    print("required1 = " + str(required1))
    print("required2 = " + str(required2))

def optional_arguments(optional1=None, optional2="default"):
    print("optional1 = " + str(optional1))
    print("optional2 = " + str(optional2))

def mixed_arguments(arg1, arg2=None, arg3=None):
    print("arg1 = " + str(arg1))
    print("arg2 = " + str(arg2))
    print("arg3 = " + str(arg3))

def keyword_arguments(arg1, **kw):
    print("arg1 = " + str(arg1))
    print("keywords = " + str(kw))

動的なワークフロー

静的なワークフローの場合はよいのですが、動的なワークフローの場合はどうでしょうか。
簡単な例では、APIドキュメントにあるように、追加のPythonタスクをその場で実行できます。

workflow.dig:

+step1:
  py>: tasks.MyWorkflow.step1

+step2:
  py>: tasks.MyWorkflow.step2

Generating Python child tasks:

import digdag
class MyWorkflow(object):
  def step1(self):
    digdag.env.add_subtask(MyWorkflow.step3, arg1=1)

  def step2(self, my_value="default"):
    print("step2: %s" % my_value)

  def step3(self, arg1):
    print("step3: %s" % arg1)

一番やりたかったのが、あるタスクの結果により、後続のタスクに別々の引数を渡し、かつ並列で多重度も可変で実行したい場合です。

最初はAPIドキュメントの情報だけで、色々トリッキーなことを考えていたのですが、examplesを眺めていると、そのものズバリでスマートな方法がありました。

examples/generate_subtasks.dig

timezone: UTC

+split:
  py>: examples.generate_subtasks.ParallelProcess.split

+parallel_process:
  py>: examples.generate_subtasks.ParallelProcess.run

examples/generate_subtasks.py

from __future__ import print_function
import digdag

class ParallelProcess(object):
    def split(self):
        digdag.env.store({"task_count": 3})

    def run(self, task_count):
        for i in range(task_count):
            digdag.env.add_subtask(ParallelProcess.subtask, index=i)
        digdag.env.subtask_config["_parallel"] = True

    def subtask(self, index):
        print("Processing " + str(index))

うーん、簡潔で美しい。ちゃんとそういうニーズにも答えているのですね。

工夫によっては、Digdagだけでバッチサーバーの分散処理にも応用できるのではないか、という気がしてきました。
もう少し検証してみたいと思います。

追記:実機検証

キューとなる postgres 1台 + Digdag server 2台の環境を作って、検証してみました。

結果は、上記タスクの

  • +split は、Digdag server 1台目で走る
  • +parallel_process は、Digdag server 2台目で走る。subtask 3つも、2台目で走る。

という感じでした。(逆もありえます。)
各タスクは分散されますが、サブタスクは分散されないようです。
要件が動的な並列処理の分散だとすると、その部分はマッチしないように感じました。

39
22
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
39
22