概要
新しいアーキテクチャになった Argo Events を使ってワークフローをトリガする手順について書いてます。
私事ですが Argo Events にはじめて着手したことと、なんと着手した初日に偶然アーキテクチャが変わったことで情報量が少なく苦労しました。また Argo Events は便利である一方であまり知られていないとも思っているため今回記事にしています。
この記事では Argo Events が何かを簡単に説明した後、GCS に アップロードされた json をその瞬間に BigQuery に書き込むケースについて紹介しています。
汎用性と拡張性が高いので、この記事を参考に他のイベントやトリガのカスタマイズもご自身で作成できるようになると思います。
Argo Events とは?
Argo Events とは kubernetes ネイティブのイベント駆動型自動化フレームワークです。
Argo Events は Argo Projects の1つで今でも開発が進んでいます。つい先日 v0.17.0 がリリースされ、アーキテクチャが大幅に変わりました。
Argo Events を使うと以下のことが出来るようになります。
- S3 バケットにオブジェクトがアップロードされたことをトリガに、kubernetes の Job を起動する。
- Slack の通知をトリガに、メッセージ内容を AWS Lambda に飛ばして実行する。
- GCP Pub/Sub からの通知をトリガに、Argo Workflow を実行する。
上記から分かるように Argo Events は「どこからの情報を受け取るのか」と「それをトリガにして何を実行するのか」を自由に組み合わせることが可能です。
Argo Events では前者を EventSource、後者を Sensor という概念にしており、ユーザーは任意のソースとトリガを作成することができます。
2020年7月時点で Argo Events が対応している EventSource は以下です。ここから情報を受け取ることができます。
- AWS SQS
- GCP PubSub
- GitHub
- K8s Resources
- Slack
- etc
また対応している Sensor のトリガーは以下です。ある通知を受け取って実行できる対象です。
- Argo Workflow Trigger
- AWS Lambda Trigger
- HTTP Trigger
- Slack Trigger
- Kubernetes Object Trigger
- etc
Argo Events は Kubernetes ネイティブなツールです。クラスタに各 CRD をデプロイすることになります。
v0.17.0 からのアーキテクチャは以下のようになっています。v0.16.0 以前のアーキテクチャはそこまで詳しくないのと、おそらく戻ることはないため説明は割愛します。

EventSource と Sensor に加えて、EventBus という概念が登場します。EventBus は簡単に言うと中継役です。EventSource で受け取った情報を Sensor に渡しています。厳密には EventSource が EventBus の実体にデータを書き込み、書き込まれた内容を Sensor が読み取るという構造になっています。
実際にやってみる
今回試すことの全体像はこんな感じです。

GCS に json ファイルが生成された時、その json ファイルの中身を BigQuery のテーブルに書き込みます。頻繁に json ファイルが GCS に吐かれ、吐かれたデータもすぐに BigQuery で使いたいという想定です。
BigQuery にデータを書き込むために Embulk を使います。Embulk には「どのファイルを処理するか」を教えてやる必要があります。また json が吐かれたタイミングで Embulk を実行したいです。これらを今回の Argo Events で実現します。
GCS にファイルが置かれたという情報を Pub/Sub 経由で Argo Events に渡し、そのまま Argo Workflow 上で Embulk を展開、失敗したら通知を飛ばすという感じです。
Pub/Sub Notifications for Cloud Storage の設定
まずは GCS にオブジェクトがアップロードされたことを Pub/Sub に飛ばす設定をします。
GCS には Pub/Sub Notifications for Cloud Storage という機能があります。これはオブジェクトが生成・削除・更新・アーカイブされたことを Pub/Sub に通知する機能です。類似機能かつ旧機能として Object change notification がありますが、ほぼ同じことが出来ることと、よりシンプルであることを理由に 公式では Pub/Sub Notifications for Cloud Storage を推奨としています。
コンソールではこの設定はできないため、gsutil で設定を行います。
gsutil notification create -t projects/[YOUR_PROJECT]/topics/[YOUR_PUBSUB_TOPIC] -f json -e OBJECT_FINALIZE gs://[YOUR_BUCKET]
-f json
は json フォーマットで通知を送ること、 -e OBJECT_FINALIZE
はオブジェクトの生成時のみ通知を飛ばすというオプションです。その他のオプションはこちらです。
サブスクライバは指定しなければ gsutil が自動的に作成してくれます。
Embulk の設定
Embulk は Tresure Data が開発するバルクインサートツールです。Embulk の情報は巷に溢れているので割愛します。
今回は GCS のデータを BigQuery へ書き込む(append)するケースです。input に embulk-input-gcs を、output に embulk-output-bigquery のプラグインを利用します。
in:
type: gcs
bucket: {{env.BUCKET_NAME}}
path_prefix: {{env.PATH}}
auth_method: json_key
json_keyfile: /app/credentials/gcp_credential.json
out:
type: bigquery
auth_method: json_key
json_keyfile: /app/credentials/gcp_credential.json
project: {{ env.GCP_PROJECT_ID }}
location: asia-northeast1
encoders: {type: gzip}
dataset: {{ env.GCP_DATASET_NAME }}
table: {{ env.TABLE_NAME }}
mode: append
このファイルとは別に Embulk を起動する Dockerfile を作成し、イメージをビルドしておきます。
Argo Events の設定
Argo Events のレポジトリを clone します。 -b stable
で v0.17.0 がインストールされます。
このレポジトリに EventSource EventBus Sensor の各 YAML ファイルがあるのでカスタマイズして適用していく流れになります。
$ git clone -b stable https://github.com/argoproj/argo-events
まず namespace: argo-events を作成し、そこに ClusterRoleBindings を作成します。
$ kubectl create namespace argo-events
$ kubectl apply -n argo-events -f https://raw.githubusercontent.com/argoproj/argo-events/stable/manifests/install.yaml
次に EventBus を定義して apply します。赤枠で囲った箇所になります。

EventBus リソースを作成すると Deployment が作られ、そいつが pod を生成します。
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
spec:
nats:
native:
replicas: 1
auth: token
containerTemplate:
resources:
requests:
cpu: "100m"
続いて EventSource を定義して apply します。

常駐する EventSource の Pod が Pub/Sub からメッセージを受け取るために、GCP の Credentials キーが必要になります。今回は volumeMounts で Credentials キー を読み込んでいます。
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: es-pub-sub
spec:
template:
serviceAccountName: argo-events-sa
container:
volumeMounts:
- name: gcp-pubsub-data
mountPath: "/app/credentials/"
volumes:
- name: gcp-pubsub-data
secret:
secretName: gcp-credentials
pubSub:
json-generation:
jsonBody: true
projectID: [YOUR_PROJECT]
topic: gcs-file-generation
credentialsFile: "/app/credentials/gcp_credentials.json"
最後に Sensor を定義して apply します。この部分で EventSource と EventBus から通知を受け取り Workflow を実行します。

いくつかポイントがあるので分割して解説します。
まずベースになる YAML は以下になります。今回トリガ対象は Job ではなく Workflow にしています。
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: sensor-gcp-pubsub
spec:
template:
serviceAccountName: argo-events-sa
dependencies:
- name: from-gcp-pubsub
eventSourceName: es-pub-sub
eventName: json-generation
triggers:
- template:
name: gcp-embulk-wf
k8s:
group: argoproj.io
version: v1alpha1
resource: workflows
operation: create
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: gcp-embulk-
spec:
entrypoint: embulk
arguments:
parameters:
- name: path
value: value
templates:
- name: embulk
serviceAccountName: argo-events-sa
inputs:
parameters:
- name: path
container:
image: [DOCKER IMAGE]
command: [embulk]
args: ["run", "[YOUR FILE].yml.liquid"]
env:
- name: path
value: "{{inputs.parameters.PATH}}"
envFrom:
- configMapRef:
name: pubsub-config
volumeMounts:
- name: gcp-pubsub-data
mountPath: "/app/credentials/"
volumes:
- name: gcp-pubsub-data
secret:
secretName: gcp-credentials
parameters:
- src:
dependencyName: from-gcp-pubsub
dataKey: attributes.objectId
dest: spec.arguments.parameters.0.value
最終的には embulk の実行引数(変数)として json のファイル名を渡したいので、以下のようにしています。少し複雑ですがPub/Sub のデータを以下のようにして コンテナに渡しています。
- EventSource のデータを 一番下の parameters で受け取る。
-
spec.arguments.parameters.0.value
に渡す。 -
spec. templates.0.inputs.parameters.0.name
に渡す。 -
value: "{{inputs.parameters.PATH}}"
に渡す。
Argo Events で受け取る Json の中身は Context
と Data
の2種類があります。
前者はいわゆる Argo Events の EventSource のメタデータです。後者は Pub/Sub からのデータが base64 でエンコードされたものが格納されています。
ドキュメントにもある通り、GCS から Pub/Sub に転送されるデータは attributes
と payload
で大別でき、オブジェクト名は attributes.objectId
です。
なので GCS にファイルが置かれると、そのファイル名は attributes.objectId
として長い旅を経て value: "{{inputs.parameters.PATH}}"
に届きます。
ただし、このままだとファイル生成時だけでなくフォルダ生成時にも通知が届いてしまいます。なので Argo Events の filters 機能を使って制限をしていきます。ついでに対象ファイルも json だけにしてみます。
これによって value
以外の通知を受け取ってもトリガしなくなります。今回は json ファイルのみを対象にしたいので正規表現を value に渡しています。
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: sensor-gcp-pubsub
spec:
template:
serviceAccountName: argo-events-sa
dependencies:
- name: from-gcp-pubsub
eventSourceName: es-pub-sub
eventName: json-generation
filters:
name: data-filter
data:
- path: attributes.objectId
type: string
value:
- "^.*.json$"
これらを含めて、Workflow に retry や exit-handler を加えたのが以下です。
exit-handler では 失敗時に slack 通知する Python スクリプトを書いて Docker image にしています。{{workflow.name}} {{workflow.status}} の情報を受け取っています。
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: sensor-gcp-pubsub
spec:
template:
serviceAccountName: argo-events-sa
dependencies:
- name: from-gcp-pubsub
eventSourceName: es-pub-sub
eventName: json-generation
filters:
name: data-filter
data:
- path: attributes.objectId
type: string
value:
- "^.*.json$"
triggers:
- template:
name: gcp-embulk-wf
k8s:
group: argoproj.io
version: v1alpha1
resource: workflows
operation: create
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: gcp-embulk-pod-
spec:
entrypoint: embulk
arguments:
parameters:
- name: path
value: value
onExit: exit-handler
templates:
- name: embulk
serviceAccountName: argo-events-sa
inputs:
parameters:
- name: path
container:
image: [DOCKER IMAGE]
command: [embulk]
args: ["run", "[YOUR FILE].yml.liquid"]
env:
- name: PATH
value: "{{inputs.parameters.path}}"
envFrom:
- configMapRef:
name: pubsub-config
volumeMounts:
- name: gcp-pubsub-data
mountPath: "/app/credentials/"
resources:
requests:
memory: "1000Mi"
cpu: "1"
limits:
memory: "1000Mi"
retryStrategy:
limit: 2
retryPolicy: "Always"
backoff:
duration: "3s"
factor: 2
- name: exit-handler
steps:
- - name: notify-failure
template: notify-failure
when: "{{workflow.status}} != Succeeded"
- name: notify-failure
container:
image: [DOCKER IMAGE]
command: [sh, -c]
args: ["python notification.py {{workflow.name}} {{workflow.status}}"]
volumes:
- name: gcp-pubsub-data
secret:
secretName: gcp-credentials
parameters:
- src:
dependencyName: from-gcp-pubsub
dataKey: attributes.objectId
dest: spec.arguments.parameters.0.value
展開されいる Pod たちは以下のようになるはずです。
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
eventbus-controller-574b764cd5-6f52x 1/1 Running 0 47h
eventbus-default-stan-0 2/2 Running 0 16h
eventbus-default-stan-1 2/2 Running 0 16h
eventbus-default-stan-2 2/2 Running 0 16h
eventsource-controller-67cf95488c-vqr64 1/1 Running 0 47h
es-pub-sub-eventsource-zdhmb-d686b4859-9gflq 1/1 Running 0 18h
sensor-pub-sub-sensor-vs82c-59dc76b7df-ffwdw 1/1 Running 0 16h
sensor-controller-59799df65d-scfp2 1/1 Running 0 47h
Configmap と Secret があることを確認した上で、GCS の当該バケットに json ファイルをアップロードすると無事 Workflow が起動し BigQuery に書き込まれているはず。
試しに kubectl logs で EventSource EventBus Sensor の pod の中身を見てみると受け渡されるデータが見れます。
以上!
最後に
現在世の中に様々なトリガツールがあります。デフォルトでイベントトリガが組み込まれているサービスもたくさんあります。クラウドサービスのマネージド化や、SaaS の普及を考えると当然だと思います。
その中でも あえて自分で記述したり設定したりする必要がある Argo Events について取り上げました。エラーログを見てても「……???」と思うことが何度かありデバッグしにくいなという印象があります。ただかなり汎用的に作られているので、かゆいところに手が届くと思います。
Kubernetes ネイティブなイベントトリガの代表感を醸し出している Argo Events を今回の実装にて自分の今後の選択肢の一つにできたかなと思ってます。今後の Argo Events に期待!