6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ZOZOAdvent Calendar 2024

Day 3

Argo Workflowsを用いたマスターデータ同期バッチの構築

Posted at

はじめに

Argo Workflowsを用いて、依存性のある複数のタスクを定期的に実行する基盤を構築しました。
本記事では、そこで得た学びや構築した内容をまとめます。

背景

ZOZOTOWNの会員マイクロサービスにおいて、オンプレミス環境のDB(SQLServer)からクラウド環境の会員マイクロサービスDB(Aurora MySQL)へ郵便番号のマスターデータを定期的に同期するバッチを構築しました。これまでのバッチはCronjobリソースで構築していましたが、今回は2つの依存性のあるバッチを順番に実行する必要がありました。1つ目は、オンプレミス環境のDBからAurora Mysqlの一時テーブルへデータを取り込むバッチで、2つ目は一時テーブルから会員マイクロサービスDBのテーブルへデータを同期するバッチです。1つ目の処理は、Embulkを用いて実装し、2つ目の処理は、Go言語で実装しました。なお、本記事では、一時テーブルをわざわざ挟む理由やそれぞれの処理内容については割愛し、インフラ観点のみをまとめます。

上記の要件を満たすために、Argo Workflowsを用いて依存性のある2つのタスクを定期的に実行する基盤を構築することにしました。社内では、別のマイクロサービスでArgo Workflowsの使用実績はありましたが、プラットフォームSREチームでははじめてだったので、調査から始めました。

調査

Argo Workflowsは、Kubernetesネイティブなワークフローエンジンです。複数のジョブを並列に実行したり、順序制御したり、条件分岐を行ったりできます。また、ワークフローの定義はYAMLで記述するため、コードとして管理できます。

今回は、定期的に2つのタスクを順番に実行するだけなので、CronWorkflowリソースで以下のmanifestを作成して、動かしてみました。task-aとtask-bを順番に実行するだけです。

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
namespace: default
metadata:
  name: sample-cron-workflow
spec:
  schedule: "* * * * *"  # 毎分実行
  concurrencyPolicy: "Forbid"
  startingDeadlineSeconds: 0
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  workflowSpec:
    activeDeadlineSeconds: 1800
    entrypoint: sample
    templates:
    - name: sample
      dag:
        tasks:
        - name: task-a
          template: task-a-template
        - name: task-b-template
          dependencies: [task-a]
          template: task-b-template
    - name: task-a-template
      container:
        image: alpine:3.7
        command: [sh, -c]
        args: ["echo 'Running Task A'; sleep 10"]
    - name: task-b-template
      container:
        image: alpine:3.7
        command: [sh, -c]
        args: ["echo 'Running Task B'; sleep 10"]

CronWorkflowリソースとPodリソースが作成されたことを確認しました。
ログも確認し、意図通りの実行順になっていることを確認しました。

│ Running Task A
│ time="2024-08-16T09:29:11.407Z" level=info msg="sub-process exited" argo=true error="<nil>"
│ Running Task B
│ time="2024-08-16T09:29:31.455Z" level=info msg="sub-process exited" argo=true error="<nil>"

意外とすんなり動いたので、このまま構築作業に移りました。

インフラ構築

CronWorkflow以外のリソース

Argo WorkflowsのCrownWorkflowリソース以外のインフラ構築作業は以下です。

  • 実行アプリケーションユーザーのDB権限整理と作成
  • DB認証情報の保管
    • AWS Secrets Manager
    • 1Password
  • 各タスクのコンテナイメージを保管するAWS ECR(以下、ECR)の構築
  • GitHub Actionsを用いた「アプリケーションコンテナイメージのbuild」と「ECRへのpush」と「vulnerability scan」を実行するCI/CDの構築
  • AWS Security Group(以下、SG)の作成と設定
    • 同期バッチ用のSGを作成
    • そのSGをAurora用のSGのインバウンドルールに追加
    • そのSGをKubernetesのSecurity Group Policy(以下、SGP)に設定し、そのSGPのspec.podselector.matchlabelsでCronWorkflowのPodに適用する
    • 以上より、マスターデータ同期バッチのPodが会員マイクロサービスDB(Aurora)にアクセスできる
  • Flux関連リソースの構築
    • 各タスクのOCIリポジトリ(ECR)
    • Providerリソース
    • Alertリソース

CronWorkflowリソース

以下のCronWorkflowリソースを作成しました。
spec.scheduleで、毎日AM 10:00に実行されるように設定しました。
spec.concurrencyPolicyはForbidにして、前回の実行が完了していない場合は同時実行を許可しないようにしました。しかし、実際にはその状況はなりえない想定です。
spec.startingDeadlineSecondsは0にして、指定時刻に実行できない場合は実行しないようにしました。遅れて実行することを許可してもよいのですが、今回は郵便番号データなので頻繁に変更が発生されるようなデータではないため、無理に実行する必要はないと判断しました。実行されなければ、前回実行分のデータがそのまま残って使われるだけです。
spec.successfulJobsHistoryLimitspec.failedJobsHistoryLimitはそれぞれ1にして、成功したジョブと失敗したジョブの履歴を1つだけ保持するようにしました。
spec.workflowSpec.metrics.prometheusで、succeed_countとdurationというメトリクスを送信するようにしました。succeed_countは、成功時に1を送信し、durationはワークフローの実行時間を送信します。なお、ZOZOではPrometheusでなくDatadogを使用していますが、インテグレーションを有効にすることでDatadogにもprometheusメトリクスを送信できます。
spec.workflowSpec.templatesで、2つのタスクを定義しました。from-dms-to-tmp-tableはEmbulkを用いてオンプレミスDBから一時テーブルにデータを取り込むタスクです。from-tmp-table-to-zipcodes-tableは、一時テーブルから会員マイクロサービスDBのテーブルにデータを同期するタスクです。それぞれのimageやenvは各環境でKustomizeによるoverlayをする構成です。retryStrategyを1にして、失敗時に1回だけリトライするようにしました。

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: zozo-member-api-sync-master-data-cron-workflow
spec:
  schedule: "0 10 * * *"  # AM 10:00 every day
  timezone: Asia/Tokyo
  concurrencyPolicy: "Forbid"
  startingDeadlineSeconds: 0
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  workflowSpec:
    podMetadata:
      labels:
        run: zozo-member-api-sync-master-data
      annotations:
        karpenter.sh/do-not-disrupt: "true"
        fluentd_firehose_delivery_stream_name: zozo-platform-zozo-member-api-sync-master-data
    podPriorityClassName: zozo-platform-high-priority
    activeDeadlineSeconds: 1800
    entrypoint: zozo-member-api-sync-master-data
    affinity:
      nodeAffinity:
        requiredDuringSchedulingIgnoredDuringExecution:
          nodeSelectorTerms:
            - matchExpressions:
                - key: karpenter.sh/nodepool
                  operator: In
                  values:
                    - platform-sre
    metrics:
      prometheus:
        - name: succeed_count
          labels:
            - key: argo_workflow_name
              value: "zozo-member-api-sync-master-data-cron-workflow"
            - key: namespace
              value: "{{ workflow.namespace }}"
          help: "Succeed count"
          when: "{{ status }} == Succeeded"
          counter:
            value: "1"
        - name: duration
          labels:
            - key: argo_workflow_name
              value: "zozo-member-api-sync-master-data-cron-workflow"
            - key: namespace
              value: "{{ workflow.namespace }}"
          help: "Duration seconds"
          gauge:
            realtime: true
            value: "{{ workflow.duration }}"
    templates:
    - name: zozo-member-api-sync-master-data
      retryStrategy:
        limit: 1
      dag:
        tasks:
        - name: from-dms-to-tmp-table
          template: from-dms-to-tmp-table
        - name: from-tmp-table-to-zipcodes-table
          dependencies: [from-dms-to-tmp-table]
          template: from-tmp-table-to-zipcodes-table
    - name: from-dms-to-tmp-table
      metadata:
        labels:
          run: zozo-member-api-sync-master-data
      container:
        name: from-dms-to-tmp-table
        image: <IMAGE>
        resources:
          limits:
            cpu: 2
            memory: 1Gi
        command: [sh, -c, java -jar /usr/local/bin/embulk -X jruby=$jruby run zipcodes_temp.yml.liquid]
        env: # 省略
    - name: from-tmp-table-to-zipcodes-table
      metadata:
        labels:
          run: zozo-member-api-sync-master-data
      container:
        name: from-tmp-table-to-zipcodes-table
        image: <IMAGE>
        resources:
          limits:
            cpu: 2
            memory: 1Gi
        command: [/synchronize_zipcodes_job]
        env: # 省略

なお、schema.jsonというOpenAPIファイルも必要なため、ダウンロードして同一ディレクトリに配置し、Kustomizationリソースのopenapiフィールドで指定しました。

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
  - cron-workflow.yaml
  - secretstore.yaml
  - security-group-policy.yaml
# Argo workflows is custom resources, so schema file is required to overlay with kustomize patch
# this file download from https://github.com/argoproj/argo-workflows/blob/master/api/jsonschema/schema.json
openapi:
  path: schema.json

また、普段実行する必要がない環境に関しては、suspend: trueで定期実行なしにしておいて、初回だけDashboard上から手動で実行することにしました。

動作確認時のトラブル

dev環境でTLSハンドシェイクエラー

以下のTLSハンドシェイクエラーが発生しました。

Error: java.lang.RuntimeException: com.microsoft.sqlserver.jdbc.SQLServerException: The driver could not establish a secure connection to SQL Server by using Secure Sockets Layer (SSL) encryption. Error: "The server selected protocol version TLS10 is not accepted by client preferences [TLS13, TLS12]".

devのオンプレミス環境のDBであるSQL Serverのバージョンが2012で古いため、TLS1.2やTLS1.3に対応していないことが原因でした。管轄SREによると、SQL Serverのバージョンアップ(更新プログラム適用)は対応にしばらく時間がかかるとのことだったので、dev環境のみEmbulk側の設定でTLS1.0を許可することにしました。具体的には、以下の通り、spec.workflowSpec.templatescommandを追加しました。

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: zozo-member-api-sync-master-data-cron-workflow
spec:
  suspend: true
  workflowSpec:
    templates:
      - name: from-dms-to-tmp-table
        container:
          name: from-dms-to-tmp-table
          # TODO: SQL Server 2012 is currently used in the dev environment. Remove this command once the upgrade is complete.
          command: ["sh", "-c", "java -Djava.security.properties=/embulk/java.security -jar /usr/local/bin/embulk -X jruby=$jruby run zipcodes_temp.yml.liquid"]

dev環境でSQLServerに接続エラー

以下のエラーが発生しました。

Error: java.lang.RuntimeException: com.microsoft.sqlserver.jdbc.SQLServerException: ?????????????????? "1433" ???????????????????

原因は、ポートまわりの環境変数の設定ミスでした。

dev環境でAurora MySQLに接続エラー

以下のエラーが発生しました。

Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure という接続エラー

SGの設定ミスでした。

stg環境でSQLServerに接続エラー

以下のエラーが発生しました。

Error: java.lang.RuntimeException: com.microsoft.sqlserver.jdbc.SQLServerException: ???? 'stg_sys_zozo_member_api' ????????????????: ??????????????????????????

SQLServer側のDBユーザーの初回ログイン時パスワード変更がされていなかったのが原因でした。

stg環境で負荷を確認

stg環境で正常に動かしてみて、SQLServerやAurora MySQLに対する負荷が問題ないことを確認しました。
23,000件程度なので、予想通りでした。
実行時間は全体で1~2分程度でした。

監視基盤構築

ZOZOではDatadog monitorによる監視を行っており、TerraformでIaC管理しています。

今回のマスターデータ同期バッチに関して追加したmonitorは1つだけです。以下の通り、過去24時間に1回も成功していない場合にアラートを出すようにしました。workflow_controller.argo_workflows_succeed_count.countは、CronWorkflowリソースのspec.workflowSpec.metrics.prometheusで送信しているメトリクスです。このcountが0の場合にアラートを出します。

locals {
  workflow_name            = "zozo-member-api-sync-master-data-cron-workflow"
  succeded_count_threshold = 0
  evaluation_hour          = 24
  # 省略
}

resource "datadog_monitor" "sync_master_data_cron_workflow_not_succeeded" {
  count = startswith(local.env, "pilot") ? 0 : 1
  name  = "[${local.env}] ${local.workflow_name} Not Succeeded"
  tags  = ["env:${local.env}", "service:zozo-member", "team:zozo-platform-sre", "workflow_name:${local.workflow_name}"]
  type  = "metric alert"
  query = "sum(last_${local.evaluation_hour}h):sum:workflow_controller.argo_workflows_succeed_count.count{env:${local.env},argo_workflow_name:${local.workflow_name}}.as_count() <= ${local.succeded_count_threshold}"
  monitor_thresholds {
    critical = local.succeded_count_threshold
  }
  message = <<-EOT
            ${local.warning_alert} {{#is_alert}}
            *🚨マスターデータ連携バッチが過去${local.evaluation_hour}時間に1回も成功していません。*
            - ジョブが正常に実行されているか確認してください。
            - [Argo workflowダッシュボード](${local.argo_workflow_dashboard_link})を確認し、ログを確認してください。バッチをリトライする場合はPF-SREに連絡してください。
            - Athenaから確認する方法: (${local.log_document_link})
            {{/is_alert}}
            EOT
}

zozo_member_api_alert_prd(チャンネル)_-zozo_group-1_個の新しいアイテム-_Slack__メイン__🏠.png

さいごに

本記事では、会員マイクロサービスの案件で構築した、依存関係のあるバッチ構築とそれのmonitor構築についてまとめました。
Kubernetesクラスター上でマイクロサービスの構築や運用を行っている方の参考になれば幸いです。

一緒に構築してくれたyk-mt12ありがとう!

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?