1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

digdag workflowのタスクをPythonで実装してみる

Last updated at Posted at 2021-06-28

はじめに

最近同じ会社の@Shohei_MiwaがOSSのApache Airflowについて書いてました。

私個人としてはdigdagをよく使っているのですが、調べてみたところdigdagでもPythonで実装できるみたいなのでちょっと試してみました。

以下にサンプルがあるのでそちらを参照しながら試してみます。
http://docs.digdag.io/python_api.html#

環境

  • Windows Server 2019
  • digdag v0.10.0
  • python 3.7

実行手順

事前確認・準備

サーバにRDPログインしてdigdagがインストールされていることを確認します。

PS C:\Users\Administrator> digdag -v
2021-06-28 13:18:03 +0000: Digdag v0.10.0
Usage: digdag <command> [options...]
  Local-mode commands:
    init <dir>                         create a new workflow project
    r[un] <workflow.dig>               run a workflow
    c[heck]                            show workflow definitions
    sched[uler]                        run a scheduler server
    migrate (run|check)                migrate database
    selfupdate                         update cli to the latest version

  Server-mode commands:
    server                             start server
省略

pythonがインストールされていることを確認します。

PS C:\Users\Administrator> python -V
Python 3.7.3

てっきりpipでインストールできると思ってたのですがPyPIには登録されていないようです。

PS C:\work\digdag\server> pip install digdag
Collecting digdag
  Could not find a version that satisfies the requirement digdag (from versions: )
No matching distribution found for digdag

過去の記事を見るとdigdagは以下で動いている場合のみimportが可能になるようです。

ファイル作成

リンクを確認しながらworkflowファイルとpythonファイルを作成します。

  • workflowファイル
PS C:\work\digdag\sample_python> ls


    ディレクトリ: C:\work\digdag\sample_python


Mode                LastWriteTime         Length Name
----                -------------         ------ ----
d-----       2021/06/28     14:10                .digdag
d-----       2021/06/28     14:11                tasks
-a----       2021/06/28     14:09             80 workflow.dig


PS C:\work\digdag\sample_python> gc workflow.dig
+step1:
  py>: tasks.MyWorkflow.step1

+step2:
  py>: tasks.MyWorkflow.step2
  • pythonファイル
PS C:\work\digdag\sample_python> ls tasks


    ディレクトリ: C:\work\digdag\sample_python\tasks


Mode                LastWriteTime         Length Name
----                -------------         ------ ----
d-----       2021/06/28     14:11                __pycache__
-a----       2021/06/28     14:06            107 __init__.py


PS C:\work\digdag\sample_python> gc .\tasks\__init__.py
class MyWorkflow(object):
  def step1(self):
    print("step1")

  def step2(self):
    print("step2")

digdag実行

作成したworkflowファイルをローカルモードで実行してみます。

PS C:\work\digdag\sample_python> digdag run .\workflow.dig
2021-06-28 14:11:00 +0000: Digdag v0.10.0
2021-06-28 14:11:02 +0000 [WARN] (main): Reusing the last session time 2021-06-28T00:00:00+00:00.
2021-06-28 14:11:02 +0000 [INFO] (main): Using session C:\work\digdag\sample_python\.digdag\status\20210628T000000+0000.
2021-06-28 14:11:02 +0000 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2021-06-28T00:00:00+00:00
2021-06-28 14:11:03 +0000 [INFO] (0017@[0:default]+workflow+step1): py>: tasks.MyWorkflow.step1
step1
2021-06-28 14:11:04 +0000 [INFO] (0017@[0:default]+workflow+step2): py>: tasks.MyWorkflow.step2
step2
Success. Task state is saved at C:\work\digdag\sample_python\.digdag\status\20210628T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

出力されたメッセージを見る限りpythonファイルに記載された"step1"と"step2"が出力されていることがわかります。

init.py以外からの呼び出し

サンプルのworkflowファイルの以下の実装を見ると、pythonの呼び出しは[フォルダ名].[クラス名].[関数名]で指定されているように見えます。

+step1:
  py>: tasks.MyWorkflow.step1

それを踏まえて以下のpythonファイルを作成してみます。
敢えてクラスは定義していません。

PS C:\work\digdag\sample_python> gc .\tasks\sample.py
from __future__ import print_function
import digdag

def step3():
  digdag.env.store({'my_value': 1})

def step4():
  print("step4: %s" % digdag.env.params["my_value"])

workflow.digファイルの実装は[フォルダ名].[ファイル名].[関数名]に変更します。

PS C:\work\digdag\sample_python> gc workflow.dig
+step1:
  py>: tasks.sample.step3

+step2:
  py>: tasks.sample.step4

workflowを実行してみたところ、想定通り"step4: 1"が出力されていました。

PS C:\work\digdag\sample_python> digdag run .\workflow.dig
2021-06-28 14:37:17 +0000: Digdag v0.10.0
2021-06-28 14:37:19 +0000 [WARN] (main): Using a new session time 2021-06-28T00:00:00+00:00.
2021-06-28 14:37:19 +0000 [INFO] (main): Using session C:\work\digdag\sample_python\.digdag\status\20210628T000000+0000.
2021-06-28 14:37:19 +0000 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2021-06-28T00:00:00+00:00
2021-06-28 14:37:21 +0000 [INFO] (0017@[0:default]+workflow+step1): py>: tasks.sample.step3
2021-06-28 14:37:21 +0000 [INFO] (0017@[0:default]+workflow+step2): py>: tasks.sample.step4
step4: 1
Success. Task state is saved at C:\work\digdag\sample_python\.digdag\status\20210628T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

特定ファイル内のクラスにある関数を呼び出したい場合は以下の記載をすることで呼び出せそうです。

  • [フォルダ名].[ファイル名].[クラス名].[関数名]

終わりに

以下を見ているともう少し複雑なこともできそうなので、時間あるときに試してみたいと思います。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?