はじめに
最近同じ会社の@Shohei_MiwaがOSSのApache Airflowについて書いてました。
私個人としてはdigdagをよく使っているのですが、調べてみたところdigdagでもPythonで実装できるみたいなのでちょっと試してみました。
以下にサンプルがあるのでそちらを参照しながら試してみます。
http://docs.digdag.io/python_api.html#
環境
- Windows Server 2019
- digdag v0.10.0
- python 3.7
実行手順
事前確認・準備
サーバにRDPログインしてdigdagがインストールされていることを確認します。
PS C:\Users\Administrator> digdag -v
2021-06-28 13:18:03 +0000: Digdag v0.10.0
Usage: digdag <command> [options...]
Local-mode commands:
init <dir> create a new workflow project
r[un] <workflow.dig> run a workflow
c[heck] show workflow definitions
sched[uler] run a scheduler server
migrate (run|check) migrate database
selfupdate update cli to the latest version
Server-mode commands:
server start server
省略
pythonがインストールされていることを確認します。
PS C:\Users\Administrator> python -V
Python 3.7.3
てっきりpipでインストールできると思ってたのですがPyPIには登録されていないようです。
PS C:\work\digdag\server> pip install digdag
Collecting digdag
Could not find a version that satisfies the requirement digdag (from versions: )
No matching distribution found for digdag
過去の記事を見るとdigdagは以下で動いている場合のみimportが可能になるようです。
ファイル作成
リンクを確認しながらworkflowファイルとpythonファイルを作成します。
- workflowファイル
PS C:\work\digdag\sample_python> ls
ディレクトリ: C:\work\digdag\sample_python
Mode LastWriteTime Length Name
---- ------------- ------ ----
d----- 2021/06/28 14:10 .digdag
d----- 2021/06/28 14:11 tasks
-a---- 2021/06/28 14:09 80 workflow.dig
PS C:\work\digdag\sample_python> gc workflow.dig
+step1:
py>: tasks.MyWorkflow.step1
+step2:
py>: tasks.MyWorkflow.step2
- pythonファイル
PS C:\work\digdag\sample_python> ls tasks
ディレクトリ: C:\work\digdag\sample_python\tasks
Mode LastWriteTime Length Name
---- ------------- ------ ----
d----- 2021/06/28 14:11 __pycache__
-a---- 2021/06/28 14:06 107 __init__.py
PS C:\work\digdag\sample_python> gc .\tasks\__init__.py
class MyWorkflow(object):
def step1(self):
print("step1")
def step2(self):
print("step2")
digdag実行
作成したworkflowファイルをローカルモードで実行してみます。
PS C:\work\digdag\sample_python> digdag run .\workflow.dig
2021-06-28 14:11:00 +0000: Digdag v0.10.0
2021-06-28 14:11:02 +0000 [WARN] (main): Reusing the last session time 2021-06-28T00:00:00+00:00.
2021-06-28 14:11:02 +0000 [INFO] (main): Using session C:\work\digdag\sample_python\.digdag\status\20210628T000000+0000.
2021-06-28 14:11:02 +0000 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2021-06-28T00:00:00+00:00
2021-06-28 14:11:03 +0000 [INFO] (0017@[0:default]+workflow+step1): py>: tasks.MyWorkflow.step1
step1
2021-06-28 14:11:04 +0000 [INFO] (0017@[0:default]+workflow+step2): py>: tasks.MyWorkflow.step2
step2
Success. Task state is saved at C:\work\digdag\sample_python\.digdag\status\20210628T000000+0000 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
出力されたメッセージを見る限りpythonファイルに記載された"step1"と"step2"が出力されていることがわかります。
init.py以外からの呼び出し
サンプルのworkflowファイルの以下の実装を見ると、pythonの呼び出しは[フォルダ名].[クラス名].[関数名]で指定されているように見えます。
+step1:
py>: tasks.MyWorkflow.step1
それを踏まえて以下のpythonファイルを作成してみます。
敢えてクラスは定義していません。
PS C:\work\digdag\sample_python> gc .\tasks\sample.py
from __future__ import print_function
import digdag
def step3():
digdag.env.store({'my_value': 1})
def step4():
print("step4: %s" % digdag.env.params["my_value"])
workflow.digファイルの実装は[フォルダ名].[ファイル名].[関数名]に変更します。
PS C:\work\digdag\sample_python> gc workflow.dig
+step1:
py>: tasks.sample.step3
+step2:
py>: tasks.sample.step4
workflowを実行してみたところ、想定通り"step4: 1"が出力されていました。
PS C:\work\digdag\sample_python> digdag run .\workflow.dig
2021-06-28 14:37:17 +0000: Digdag v0.10.0
2021-06-28 14:37:19 +0000 [WARN] (main): Using a new session time 2021-06-28T00:00:00+00:00.
2021-06-28 14:37:19 +0000 [INFO] (main): Using session C:\work\digdag\sample_python\.digdag\status\20210628T000000+0000.
2021-06-28 14:37:19 +0000 [INFO] (main): Starting a new session project id=1 workflow name=workflow session_time=2021-06-28T00:00:00+00:00
2021-06-28 14:37:21 +0000 [INFO] (0017@[0:default]+workflow+step1): py>: tasks.sample.step3
2021-06-28 14:37:21 +0000 [INFO] (0017@[0:default]+workflow+step2): py>: tasks.sample.step4
step4: 1
Success. Task state is saved at C:\work\digdag\sample_python\.digdag\status\20210628T000000+0000 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
特定ファイル内のクラスにある関数を呼び出したい場合は以下の記載をすることで呼び出せそうです。
- [フォルダ名].[ファイル名].[クラス名].[関数名]
終わりに
以下を見ているともう少し複雑なこともできそうなので、時間あるときに試してみたいと思います。