19
7

More than 3 years have passed since last update.

新しくなった Argo Events を使ってワークフローを実行

Last updated at Posted at 2020-08-01

概要

新しいアーキテクチャになった Argo Events を使ってワークフローをトリガする手順について書いてます。

私事ですが Argo Events にはじめて着手したことと、なんと着手した初日に偶然アーキテクチャが変わったことで情報量が少なく苦労しました。また Argo Events は便利である一方であまり知られていないとも思っているため今回記事にしています。

この記事では Argo Events が何かを簡単に説明した後、GCS に アップロードされた json をその瞬間に BigQuery に書き込むケースについて紹介しています。

汎用性と拡張性が高いので、この記事を参考に他のイベントやトリガのカスタマイズもご自身で作成できるようになると思います。

Argo Events とは?

Argo Events とは kubernetes ネイティブのイベント駆動型自動化フレームワークです。

Argo Events は Argo Projects の1つで今でも開発が進んでいます。つい先日 v0.17.0 がリリースされ、アーキテクチャが大幅に変わりました。

argo-events-top-level.png

Argo Events を使うと以下のことが出来るようになります。

  • S3 バケットにオブジェクトがアップロードされたことをトリガに、kubernetes の Job を起動する。
  • Slack の通知をトリガに、メッセージ内容を AWS Lambda に飛ばして実行する。
  • GCP Pub/Sub からの通知をトリガに、Argo Workflow を実行する。

上記から分かるように Argo Events は「どこからの情報を受け取るのか」と「それをトリガにして何を実行するのか」を自由に組み合わせることが可能です。

Argo Events では前者を EventSource、後者を Sensor という概念にしており、ユーザーは任意のソースとトリガを作成することができます。

2020年7月時点で Argo Events が対応している EventSource は以下です。ここから情報を受け取ることができます。

  • AWS SQS
  • GCP PubSub
  • GitHub
  • K8s Resources
  • Slack
  • etc

また対応している Sensor のトリガーは以下です。ある通知を受け取って実行できる対象です。

  • Argo Workflow Trigger
  • AWS Lambda Trigger
  • HTTP Trigger
  • Slack Trigger
  • Kubernetes Object Trigger
  • etc

Argo Events は Kubernetes ネイティブなツールです。クラスタに各 CRD をデプロイすることになります。

v0.17.0 からのアーキテクチャは以下のようになっています。v0.16.0 以前のアーキテクチャはそこまで詳しくないのと、おそらく戻ることはないため説明は割愛します。

スクリーンショット 2020-08-01 8.58.14.png
(出典:https://argoproj.github.io/argo-events/concepts/architecture/)

EventSource と Sensor に加えて、EventBus という概念が登場します。EventBus は簡単に言うと中継役です。EventSource で受け取った情報を Sensor に渡しています。厳密には EventSource が EventBus の実体にデータを書き込み、書き込まれた内容を Sensor が読み取るという構造になっています。

実際にやってみる

今回試すことの全体像はこんな感じです。

スクリーンショット 2020-08-01 16.36.28.png

GCS に json ファイルが生成された時、その json ファイルの中身を BigQuery のテーブルに書き込みます。頻繁に json ファイルが GCS に吐かれ、吐かれたデータもすぐに BigQuery で使いたいという想定です。

BigQuery にデータを書き込むために Embulk を使います。Embulk には「どのファイルを処理するか」を教えてやる必要があります。また json が吐かれたタイミングで Embulk を実行したいです。これらを今回の Argo Events で実現します。

GCS にファイルが置かれたという情報を Pub/Sub 経由で Argo Events に渡し、そのまま Argo Workflow 上で Embulk を展開、失敗したら通知を飛ばすという感じです。

Pub/Sub Notifications for Cloud Storage の設定

まずは GCS にオブジェクトがアップロードされたことを Pub/Sub に飛ばす設定をします。

GCS には Pub/Sub Notifications for Cloud Storage という機能があります。これはオブジェクトが生成・削除・更新・アーカイブされたことを Pub/Sub に通知する機能です。類似機能かつ旧機能として Object change notification がありますが、ほぼ同じことが出来ることと、よりシンプルであることを理由に 公式では Pub/Sub Notifications for Cloud Storage を推奨としています。

コンソールではこの設定はできないため、gsutil で設定を行います。

gsutil notification create -t projects/[YOUR_PROJECT]/topics/[YOUR_PUBSUB_TOPIC] -f json -e OBJECT_FINALIZE gs://[YOUR_BUCKET]

-f json は json フォーマットで通知を送ること、 -e OBJECT_FINALIZE はオブジェクトの生成時のみ通知を飛ばすというオプションです。その他のオプションはこちらです。

サブスクライバは指定しなければ gsutil が自動的に作成してくれます。

Embulk の設定

Embulk は Tresure Data が開発するバルクインサートツールです。Embulk の情報は巷に溢れているので割愛します。

今回は GCS のデータを BigQuery へ書き込む(append)するケースです。input に embulk-input-gcs を、output に embulk-output-bigquery のプラグインを利用します。

config.yml.liquid
in:
  type: gcs
  bucket: {{env.BUCKET_NAME}}
  path_prefix: {{env.PATH}}
  auth_method: json_key
  json_keyfile: /app/credentials/gcp_credential.json

out:
  type: bigquery
  auth_method: json_key
  json_keyfile: /app/credentials/gcp_credential.json
  project: {{ env.GCP_PROJECT_ID }}
  location: asia-northeast1
  encoders: {type: gzip}
  dataset: {{ env.GCP_DATASET_NAME }}
  table: {{ env.TABLE_NAME }}
  mode: append

このファイルとは別に Embulk を起動する Dockerfile を作成し、イメージをビルドしておきます。

Argo Events の設定

Argo Events のレポジトリを clone します。 -b stable で v0.17.0 がインストールされます。
このレポジトリに EventSource EventBus Sensor の各 YAML ファイルがあるのでカスタマイズして適用していく流れになります。

$ git clone -b stable https://github.com/argoproj/argo-events

まず namespace: argo-events を作成し、そこに ClusterRoleBindings を作成します。

$ kubectl create namespace argo-events

$ kubectl apply -n argo-events -f https://raw.githubusercontent.com/argoproj/argo-events/stable/manifests/install.yaml

次に EventBus を定義して apply します。赤枠で囲った箇所になります。

スクリーンショット 2020-08-01 8.58.14のコピー.png

EventBus リソースを作成すると Deployment が作られ、そいつが pod を生成します。

eventbus.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: default
spec:
  nats:
    native:
      replicas: 1
      auth: token
      containerTemplate:
        resources:
          requests:
          cpu: "100m"

続いて EventSource を定義して apply します。

スクリーンショット 2020-08-01 8.58.14のコピー2.png

常駐する EventSource の Pod が Pub/Sub からメッセージを受け取るために、GCP の Credentials キーが必要になります。今回は volumeMounts で Credentials キー を読み込んでいます。

es-pub-sub.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: es-pub-sub
spec:
  template:
    serviceAccountName: argo-events-sa
    container:
      volumeMounts:
        - name: gcp-pubsub-data
          mountPath: "/app/credentials/"
    volumes:
      - name: gcp-pubsub-data
        secret:
          secretName: gcp-credentials
  pubSub:
    json-generation:
      jsonBody: true
      projectID: [YOUR_PROJECT]
      topic: gcs-file-generation
      credentialsFile: "/app/credentials/gcp_credentials.json"

最後に Sensor を定義して apply します。この部分で EventSource と EventBus から通知を受け取り Workflow を実行します。

スクリーンショット 2020-08-01 8.58.14.png

いくつかポイントがあるので分割して解説します。

まずベースになる YAML は以下になります。今回トリガ対象は Job ではなく Workflow にしています。

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: sensor-gcp-pubsub
spec:
  template:
    serviceAccountName: argo-events-sa
  dependencies:
    - name: from-gcp-pubsub
      eventSourceName: es-pub-sub
      eventName: json-generation
  triggers:
    - template:
        name: gcp-embulk-wf
        k8s:
          group: argoproj.io
          version: v1alpha1
          resource: workflows
          operation: create
          source:
            resource:
              apiVersion: argoproj.io/v1alpha1
              kind: Workflow
              metadata:
                generateName: gcp-embulk-
              spec:
                entrypoint: embulk
                arguments:
                  parameters:
                  - name: path
                    value: value
                templates:
                - name: embulk
                  serviceAccountName: argo-events-sa
                  inputs:
                    parameters:
                    - name: path
                  container:
                    image: [DOCKER IMAGE]
                    command: [embulk]
                    args: ["run", "[YOUR FILE].yml.liquid"]
                    env:
                    - name: path
                      value: "{{inputs.parameters.PATH}}"
                    envFrom:
                    - configMapRef:
                        name: pubsub-config
                    volumeMounts:
                    - name: gcp-pubsub-data
                      mountPath: "/app/credentials/"
                volumes:
                - name: gcp-pubsub-data
                  secret:
                    secretName: gcp-credentials
          parameters:
            - src:
                dependencyName: from-gcp-pubsub
                dataKey: attributes.objectId
              dest: spec.arguments.parameters.0.value

最終的には embulk の実行引数(変数)として json のファイル名を渡したいので、以下のようにしています。少し複雑ですがPub/Sub のデータを以下のようにして コンテナに渡しています。

  • EventSource のデータを 一番下の parameters で受け取る。
  • spec.arguments.parameters.0.value に渡す。
  • spec. templates.0.inputs.parameters.0.name に渡す。
  • value: "{{inputs.parameters.PATH}}" に渡す。

Argo Events で受け取る Json の中身は ContextData の2種類があります。
前者はいわゆる Argo Events の EventSource のメタデータです。後者は Pub/Sub からのデータが base64 でエンコードされたものが格納されています。

ドキュメントにもある通り、GCS から Pub/Sub に転送されるデータは attributespayload で大別でき、オブジェクト名は attributes.objectId です。

なので GCS にファイルが置かれると、そのファイル名は attributes.objectId として長い旅を経て value: "{{inputs.parameters.PATH}}" に届きます。

ただし、このままだとファイル生成時だけでなくフォルダ生成時にも通知が届いてしまいます。なので Argo Events の filters 機能を使って制限をしていきます。ついでに対象ファイルも json だけにしてみます。

これによって value 以外の通知を受け取ってもトリガしなくなります。今回は json ファイルのみを対象にしたいので正規表現を value に渡しています。

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: sensor-gcp-pubsub
spec:
  template:
    serviceAccountName: argo-events-sa
  dependencies:
    - name: from-gcp-pubsub
      eventSourceName: es-pub-sub
      eventName: json-generation
      filters:
        name: data-filter
        data:
          - path: attributes.objectId
            type: string
            value: 
              - "^.*.json$"

これらを含めて、Workflow に retry や exit-handler を加えたのが以下です。

exit-handler では 失敗時に slack 通知する Python スクリプトを書いて Docker image にしています。{{workflow.name}} {{workflow.status}} の情報を受け取っています。

sonsor-pub-sub.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: sensor-gcp-pubsub
spec:
  template:
    serviceAccountName: argo-events-sa
  dependencies:
    - name: from-gcp-pubsub
      eventSourceName: es-pub-sub
      eventName: json-generation
      filters:
        name: data-filter
        data:
          - path: attributes.objectId
            type: string
            value: 
              - "^.*.json$"
  triggers:
    - template:
        name: gcp-embulk-wf
        k8s:
          group: argoproj.io
          version: v1alpha1
          resource: workflows
          operation: create
          source:
            resource:
              apiVersion: argoproj.io/v1alpha1
              kind: Workflow
              metadata:
                generateName: gcp-embulk-pod-
              spec:
                entrypoint: embulk
                arguments:
                  parameters:
                  - name: path
                    value: value
                onExit: exit-handler
                templates:
                - name: embulk
                  serviceAccountName: argo-events-sa
                  inputs:
                    parameters:
                    - name: path
                  container:
                    image: [DOCKER IMAGE]
                    command: [embulk]
                    args: ["run", "[YOUR FILE].yml.liquid"]
                    env:
                    - name: PATH
                      value: "{{inputs.parameters.path}}"
                    envFrom:
                    - configMapRef:
                        name: pubsub-config
                    volumeMounts:
                    - name: gcp-pubsub-data
                      mountPath: "/app/credentials/"
                    resources:
                      requests:
                        memory: "1000Mi"
                        cpu: "1"
                      limits:
                        memory: "1000Mi"
                  retryStrategy:
                    limit: 2
                    retryPolicy: "Always"
                    backoff:
                      duration: "3s"
                      factor: 2
                - name: exit-handler
                  steps:
                  - - name: notify-failure
                      template: notify-failure
                      when: "{{workflow.status}} != Succeeded"
                - name: notify-failure
                  container:
                    image: [DOCKER IMAGE]
                    command: [sh, -c]
                    args: ["python notification.py {{workflow.name}} {{workflow.status}}"]
                volumes:
                - name: gcp-pubsub-data
                  secret:
                    secretName: gcp-credentials
          parameters:
            - src:
                dependencyName: from-gcp-pubsub
                dataKey: attributes.objectId
              dest: spec.arguments.parameters.0.value

展開されいる Pod たちは以下のようになるはずです。

$ kubectl get pod
NAME                                                           READY   STATUS      RESTARTS   AGE
eventbus-controller-574b764cd5-6f52x                           1/1     Running     0          47h
eventbus-default-stan-0                                        2/2     Running     0          16h
eventbus-default-stan-1                                        2/2     Running     0          16h
eventbus-default-stan-2                                        2/2     Running     0          16h
eventsource-controller-67cf95488c-vqr64                        1/1     Running     0          47h
es-pub-sub-eventsource-zdhmb-d686b4859-9gflq                   1/1     Running     0          18h
sensor-pub-sub-sensor-vs82c-59dc76b7df-ffwdw                   1/1     Running     0          16h
sensor-controller-59799df65d-scfp2                             1/1     Running     0          47h

Configmap と Secret があることを確認した上で、GCS の当該バケットに json ファイルをアップロードすると無事 Workflow が起動し BigQuery に書き込まれているはず。

試しに kubectl logs で EventSource EventBus Sensor の pod の中身を見てみると受け渡されるデータが見れます。

以上!

最後に

現在世の中に様々なトリガツールがあります。デフォルトでイベントトリガが組み込まれているサービスもたくさんあります。クラウドサービスのマネージド化や、SaaS の普及を考えると当然だと思います。

その中でも あえて自分で記述したり設定したりする必要がある Argo Events について取り上げました。エラーログを見てても「……???」と思うことが何度かありデバッグしにくいなという印象があります。ただかなり汎用的に作られているので、かゆいところに手が届くと思います。

Kubernetes ネイティブなイベントトリガの代表感を醸し出している Argo Events を今回の実装にて自分の今後の選択肢の一つにできたかなと思ってます。今後の Argo Events に期待!

19
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
19
7