概要
Argo Workflowsで日付(時刻)指定が必要なジョブを実行するときはどうすれば良いんだっけ? というのを調査・検討してみた結果を整理したものです。
- 具体的な例として日付指定で実行する必要があるワークフローを定義する方法を記載しています。
- Argo初心者が書いているので、概ねドキュメントを見れば分かるレベルになっており、上級者向けの内容は特にありません。
Workflow
引数で日付を指定可能にする
実行時に時間や日付を指定する用途には arguments
を利用することができます。指定された arguments
の値は Workflow の中で {{workflow.parameters.date}}
という形で参照することができます。
下記は arguments
で指定された日付(2020-06-28
など)を色々な形で出力するWorkflowの例です。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: daily-wf-
spec:
entrypoint: daily
# 引数の定義
arguments:
parameters:
- name: date
value: ""
templates:
- name: daily
steps:
- - name: step1
template: echo
- - name: step2
template: date
# echoで日付を出力する
- name: echo
container:
image: debian:buster-slim
command: ["/bin/bash", "-c"]
args:
- |
echo "Date: {{workflow.parameters.date}}"
# dateコマンドに日付を渡す
- name: date
container:
image: debian:buster-slim
command: ["/bin/bash", "-c"]
args:
- |
echo "DateTime: `date -d {{workflow.parameters.date}}`"
Workflowをsubmitする際に -p
パラメータを使って arguments
に値を渡すことができます。
$ argo submit daily-wf.yaml -p "date=2020-01-01"
Name: daily-wf-8qmv7
Namespace: default
ServiceAccount: default
Status: Pending
Created: Mon Jun 29 01:08:50 +0900 (now)
Parameters:
date: 2020-01-01
$ argo logs daily-wf-8qmv7
daily-wf-8qmv7-934767017: Date: 2020-01-01
daily-wf-8qmv7-3044569517: DateTime: Wed Jan 1 00:00:00 UTC 2020
デフォルト引数を利用する
-p
で引数を渡さなかった場合には、value
で指定されたデフォルト値(今回の場合は""
)が代入されます。コマンドがデフォルト値でも動作する場合は良いのですが、今回はそのままでは正常に動作しないため、何らかの対応を行う必要があります。
ここでは script の stdout を他の template から参照する機能を利用してみます。
追加されている要素は下記のとおりです。
-
default-date
という step を最初に追加-
default-date
では date の値を stdout に出力する。ただし date が空の場合に本日の日付
を stdout に出力する。
-
- 各stepに
arguments
を追加して{{steps.default-date.outputs.result}}
という形でdefault-date
の stdout の内容(つまり日付)を参照する - template側では
inputs
によりarguments
で指定された値を受け取り{{inputs.parameters.date}}
という形で参照する
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: daily-wf-
spec:
entrypoint: daily
arguments:
parameters:
- name: date
value: ""
templates:
- name: daily
steps:
- - name: default-date
template: default-date
- - name: step1
template: echo
arguments:
parameters:
- name: date
value: "{{steps.default-date.outputs.result}}"
- - name: step2
template: date
arguments:
parameters:
- name: date
value: "{{steps.default-date.outputs.result}}"
# dateがdefaultの場合は今日の日付を出力
- name: default-date
script:
image: debian:buster-slim
command: ["/bin/bash"]
source: |
d="{{workflow.parameters.date}}"
[ -z "${d}" ] && d=`date +%Y-%m-%d`
echo "${d}"
# echoで日付を出力する
- name: echo
inputs:
parameters:
- name: date
container:
image: debian:buster-slim
command: ["/bin/bash", "-c"]
args:
- |
echo "Date: {{inputs.parameters.date}}"
# dateコマンドに日付を渡す
- name: date
inputs:
parameters:
- name: date
container:
image: debian:buster-slim
command: ["/bin/bash", "-c"]
args:
- |
echo "DateTime: `date -d {{inputs.parameters.date}}`"
これで引数なしで実行した場合には当日の日付が利用されるようになります。
(Submitした日とズレているのはコンテナがUTCになっているため)
$ argo submit daily-default-wf.yaml
Name: daily-wf-k5tqh
Namespace: default
ServiceAccount: default
Status: Pending
Created: Mon Jun 29 01:12:08 +0900 (now)
Parameters:
date:
$ argo logs daily-wf-k5tqh
daily-wf-k5tqh-2822936760: 2020-06-28
daily-wf-k5tqh-280102960: Date: 2020-06-28
daily-wf-k5tqh-3556009402: DateTime: Sun Jun 28 00:00:00 UTC 2020
CronWorkflow
定期実行する
CronWorkflowで時刻(日付)指定が必要な処理を定期実行する場合、基本的には実行日時が基準(当日、または前日など)になると思います。
例えばDigdagでは session_time
などでスケジュールされた時刻を参照できるようですが、 Argo には今のところそのような機能はないようです。そのため自力で現在時刻を取得して利用する必要があります。
ちょうどデフォルト引数を利用したWorkflowが、現在時刻から日付を取得する形になっているので、CronWorkflowでもそのまま利用すれば良さそうです。下記は先ほどのWorkflowと同じ処理を、毎分実行するようにしたCronWorkflowの例となります。
(workflowSpecの部分は先ほどの例をそのまま利用している)
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: cron-wf
spec:
schedule: "*/1 * * * *"
workflowSpec:
entrypoint: daily
# 引数の定義
arguments:
parameters:
- name: date
value: ""
templates:
- name: daily
steps:
- - name: default-date
template: default-date
- - name: step1
template: echo
arguments:
parameters:
- name: date
value: "{{steps.default-date.outputs.result}}"
- - name: step2
template: date
arguments:
parameters:
- name: date
value: "{{steps.default-date.outputs.result}}"
# dateがdefaultの場合は今日の日付を出力
- name: default-date
script:
image: debian:buster-slim
command: ["/bin/bash"]
source: |
d="{{workflow.parameters.date}}"
[ -z "${d}" ] && d=`date +%Y-%m-%d`
echo "${d}"
# echoで日付を出力する
- name: echo
inputs:
parameters:
- name: date
container:
image: debian:buster-slim
command: ["/bin/bash", "-c"]
args:
- |
echo "Date: {{inputs.parameters.date}}"
# dateコマンドに日付を渡す
- name: date
inputs:
parameters:
- name: date
container:
image: debian:buster-slim
command: ["/bin/bash", "-c"]
args:
- |
echo "DateTime: `date -d {{inputs.parameters.date}}`"
CronWorkflowはcron create
コマンドで作成することができます。
今回の場合は特別な引数などは不要です(workflowSpecにargumentsがあるが毎回デフォルト値、つまり実行時の日付になる)。
$ argo cron create cron-wf.yaml
Name: cron-wf
Namespace: default
Created: Mon Jun 29 01:21:15 +0900 (now)
Schedule: * * * * *
Suspended: false
日付指定で再実行する
CronWorkflowは submit --from
を利用することで単発のWorkflowとして実行することが可能です。また再実行時にはパラメータも渡せるので、下記のようにすることで特定日の再実行を行うことができます。
-
submit --from
ではKubernetes上のCRDからWorkflowを生成します。そのためcronwf/[CronWorkflowの名称]
というKubernetesリソース名で指定する必要があります。
$ argo submit --from cronwf/cron-wf -p "date=2020-01-01"
Name: cron-wf-2pcxr
Namespace: default
ServiceAccount: default
Status: Pending
Created: Mon Jun 29 01:18:05 +0900 (now)
Parameters:
date: 2020-01-01
$ argo logs cron-wf-2pcxr
cron-wf-2pcxr-3997186673: 2020-01-01
cron-wf-2pcxr-666284619: Date: 2020-01-01
cron-wf-2pcxr-937805997: DateTime: Wed Jan 1 00:00:00 UTC 2020
その他(細かいメモ)
CronWorkflowをJSTでスケジュールする
CronWorkflowのスケジュールはデフォルトでは UTC
として扱われますが、timezone
を指定することでJST等に変更することが可能です。
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: cron-wf
spec:
schedule: "0 0 * * *"
# scheduleをJSTにする
timezone: "Asia/Tokyo"
日本時間で1日単位の処理をする場合などに便利そうなのですが、影響を受けるのはあくまでCronWorkflowのスケジュールだけという注意点があります(ある意味当たり前ですが)。コンテナ側のタイムゾーンはそのままのため、先ほどの例のようにdate
がUTC
で計算されて意図した日付にならない可能性が高くなります。
コンテナ側も個別にJSTに設定するか、timezone
を諦めて全てをUTCで動くようにしておくか、どちらかに統一しておいた方が紛らわしくなくて良さそうです。
CronWorkflowの起動遅れの考慮
CronWorkflow内では現在時刻を利用して日付を取得しています。1日単位であればあまり問題ない気はするのですが、例えば分単位になった場合には、予定起動時間から実際に処理が行われるまでに時間がかかると、想定とは違う時間指定で動作してしまうことが考えられます。
この問題を厳密に避けるためには、Digdagのsession_timeのように、スケジュール上の起動時刻をArgo側から渡してもらえると良いのですが、今のところそのような機能はないようです。
(処理内容にもよりますが)とりあえずの対策としては startingDeadlineSeconds
に適切な値を設定して、想定時間内に起動しなかった場合は諦めるようにしておくのが無難なのかなと思っています。
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: cron-wf
spec:
# 30分毎にWorkflowを実行する
schedule: "*/30 * * * *"
# 900秒(15分)以内に起動しなければ諦める
startingDeadlineSeconds: 900