はじめに
Argo Workflowsを用いて、依存性のある複数のタスクを定期的に実行する基盤を構築しました。
本記事では、そこで得た学びや構築した内容をまとめます。
背景
ZOZOTOWNの会員マイクロサービスにおいて、オンプレミス環境のDB(SQLServer)からクラウド環境の会員マイクロサービスDB(Aurora MySQL)へ郵便番号のマスターデータを定期的に同期するバッチを構築しました。これまでのバッチはCronjobリソースで構築していましたが、今回は2つの依存性のあるバッチを順番に実行する必要がありました。1つ目は、オンプレミス環境のDBからAurora Mysqlの一時テーブルへデータを取り込むバッチで、2つ目は一時テーブルから会員マイクロサービスDBのテーブルへデータを同期するバッチです。1つ目の処理は、Embulkを用いて実装し、2つ目の処理は、Go言語で実装しました。なお、本記事では、一時テーブルをわざわざ挟む理由やそれぞれの処理内容については割愛し、インフラ観点のみをまとめます。
上記の要件を満たすために、Argo Workflowsを用いて依存性のある2つのタスクを定期的に実行する基盤を構築することにしました。社内では、別のマイクロサービスでArgo Workflowsの使用実績はありましたが、プラットフォームSREチームでははじめてだったので、調査から始めました。
調査
Argo Workflowsは、Kubernetesネイティブなワークフローエンジンです。複数のジョブを並列に実行したり、順序制御したり、条件分岐を行ったりできます。また、ワークフローの定義はYAMLで記述するため、コードとして管理できます。
今回は、定期的に2つのタスクを順番に実行するだけなので、CronWorkflowリソースで以下のmanifestを作成して、動かしてみました。task-aとtask-bを順番に実行するだけです。
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
namespace: default
metadata:
name: sample-cron-workflow
spec:
schedule: "* * * * *" # 毎分実行
concurrencyPolicy: "Forbid"
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
workflowSpec:
activeDeadlineSeconds: 1800
entrypoint: sample
templates:
- name: sample
dag:
tasks:
- name: task-a
template: task-a-template
- name: task-b-template
dependencies: [task-a]
template: task-b-template
- name: task-a-template
container:
image: alpine:3.7
command: [sh, -c]
args: ["echo 'Running Task A'; sleep 10"]
- name: task-b-template
container:
image: alpine:3.7
command: [sh, -c]
args: ["echo 'Running Task B'; sleep 10"]
CronWorkflowリソースとPodリソースが作成されたことを確認しました。
ログも確認し、意図通りの実行順になっていることを確認しました。
│ Running Task A
│ time="2024-08-16T09:29:11.407Z" level=info msg="sub-process exited" argo=true error="<nil>"
│ Running Task B
│ time="2024-08-16T09:29:31.455Z" level=info msg="sub-process exited" argo=true error="<nil>"
意外とすんなり動いたので、このまま構築作業に移りました。
インフラ構築
CronWorkflow以外のリソース
Argo WorkflowsのCrownWorkflowリソース以外のインフラ構築作業は以下です。
- 実行アプリケーションユーザーのDB権限整理と作成
- DB認証情報の保管
- AWS Secrets Manager
- 1Password
- 各タスクのコンテナイメージを保管するAWS ECR(以下、ECR)の構築
- GitHub Actionsを用いた「アプリケーションコンテナイメージのbuild」と「ECRへのpush」と「vulnerability scan」を実行するCI/CDの構築
- AWS Security Group(以下、SG)の作成と設定
- 同期バッチ用のSGを作成
- そのSGをAurora用のSGのインバウンドルールに追加
- そのSGをKubernetesのSecurity Group Policy(以下、SGP)に設定し、そのSGPの
spec.podselector.matchlabels
でCronWorkflowのPodに適用する - 以上より、マスターデータ同期バッチのPodが会員マイクロサービスDB(Aurora)にアクセスできる
- Flux関連リソースの構築
- 各タスクのOCIリポジトリ(ECR)
- Providerリソース
- Alertリソース
CronWorkflowリソース
以下のCronWorkflowリソースを作成しました。
spec.schedule
で、毎日AM 10:00に実行されるように設定しました。
spec.concurrencyPolicy
はForbidにして、前回の実行が完了していない場合は同時実行を許可しないようにしました。しかし、実際にはその状況はなりえない想定です。
spec.startingDeadlineSeconds
は0にして、指定時刻に実行できない場合は実行しないようにしました。遅れて実行することを許可してもよいのですが、今回は郵便番号データなので頻繁に変更が発生されるようなデータではないため、無理に実行する必要はないと判断しました。実行されなければ、前回実行分のデータがそのまま残って使われるだけです。
spec.successfulJobsHistoryLimit
とspec.failedJobsHistoryLimit
はそれぞれ1にして、成功したジョブと失敗したジョブの履歴を1つだけ保持するようにしました。
spec.workflowSpec.metrics.prometheus
で、succeed_countとdurationというメトリクスを送信するようにしました。succeed_countは、成功時に1を送信し、durationはワークフローの実行時間を送信します。なお、ZOZOではPrometheusでなくDatadogを使用していますが、インテグレーションを有効にすることでDatadogにもprometheusメトリクスを送信できます。
spec.workflowSpec.templates
で、2つのタスクを定義しました。from-dms-to-tmp-table
はEmbulkを用いてオンプレミスDBから一時テーブルにデータを取り込むタスクです。from-tmp-table-to-zipcodes-table
は、一時テーブルから会員マイクロサービスDBのテーブルにデータを同期するタスクです。それぞれのimageやenvは各環境でKustomizeによるoverlayをする構成です。retryStrategy
を1にして、失敗時に1回だけリトライするようにしました。
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: zozo-member-api-sync-master-data-cron-workflow
spec:
schedule: "0 10 * * *" # AM 10:00 every day
timezone: Asia/Tokyo
concurrencyPolicy: "Forbid"
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
workflowSpec:
podMetadata:
labels:
run: zozo-member-api-sync-master-data
annotations:
karpenter.sh/do-not-disrupt: "true"
fluentd_firehose_delivery_stream_name: zozo-platform-zozo-member-api-sync-master-data
podPriorityClassName: zozo-platform-high-priority
activeDeadlineSeconds: 1800
entrypoint: zozo-member-api-sync-master-data
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: karpenter.sh/nodepool
operator: In
values:
- platform-sre
metrics:
prometheus:
- name: succeed_count
labels:
- key: argo_workflow_name
value: "zozo-member-api-sync-master-data-cron-workflow"
- key: namespace
value: "{{ workflow.namespace }}"
help: "Succeed count"
when: "{{ status }} == Succeeded"
counter:
value: "1"
- name: duration
labels:
- key: argo_workflow_name
value: "zozo-member-api-sync-master-data-cron-workflow"
- key: namespace
value: "{{ workflow.namespace }}"
help: "Duration seconds"
gauge:
realtime: true
value: "{{ workflow.duration }}"
templates:
- name: zozo-member-api-sync-master-data
retryStrategy:
limit: 1
dag:
tasks:
- name: from-dms-to-tmp-table
template: from-dms-to-tmp-table
- name: from-tmp-table-to-zipcodes-table
dependencies: [from-dms-to-tmp-table]
template: from-tmp-table-to-zipcodes-table
- name: from-dms-to-tmp-table
metadata:
labels:
run: zozo-member-api-sync-master-data
container:
name: from-dms-to-tmp-table
image: <IMAGE>
resources:
limits:
cpu: 2
memory: 1Gi
command: [sh, -c, java -jar /usr/local/bin/embulk -X jruby=$jruby run zipcodes_temp.yml.liquid]
env: # 省略
- name: from-tmp-table-to-zipcodes-table
metadata:
labels:
run: zozo-member-api-sync-master-data
container:
name: from-tmp-table-to-zipcodes-table
image: <IMAGE>
resources:
limits:
cpu: 2
memory: 1Gi
command: [/synchronize_zipcodes_job]
env: # 省略
なお、schema.json
というOpenAPIファイルも必要なため、ダウンロードして同一ディレクトリに配置し、Kustomizationリソースのopenapiフィールドで指定しました。
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- cron-workflow.yaml
- secretstore.yaml
- security-group-policy.yaml
# Argo workflows is custom resources, so schema file is required to overlay with kustomize patch
# this file download from https://github.com/argoproj/argo-workflows/blob/master/api/jsonschema/schema.json
openapi:
path: schema.json
また、普段実行する必要がない環境に関しては、suspend: true
で定期実行なしにしておいて、初回だけDashboard上から手動で実行することにしました。
動作確認時のトラブル
dev環境でTLSハンドシェイクエラー
以下のTLSハンドシェイクエラーが発生しました。
Error: java.lang.RuntimeException: com.microsoft.sqlserver.jdbc.SQLServerException: The driver could not establish a secure connection to SQL Server by using Secure Sockets Layer (SSL) encryption. Error: "The server selected protocol version TLS10 is not accepted by client preferences [TLS13, TLS12]".
devのオンプレミス環境のDBであるSQL Serverのバージョンが2012で古いため、TLS1.2やTLS1.3に対応していないことが原因でした。管轄SREによると、SQL Serverのバージョンアップ(更新プログラム適用)は対応にしばらく時間がかかるとのことだったので、dev環境のみEmbulk側の設定でTLS1.0を許可することにしました。具体的には、以下の通り、spec.workflowSpec.templates
にcommand
を追加しました。
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: zozo-member-api-sync-master-data-cron-workflow
spec:
suspend: true
workflowSpec:
templates:
- name: from-dms-to-tmp-table
container:
name: from-dms-to-tmp-table
# TODO: SQL Server 2012 is currently used in the dev environment. Remove this command once the upgrade is complete.
command: ["sh", "-c", "java -Djava.security.properties=/embulk/java.security -jar /usr/local/bin/embulk -X jruby=$jruby run zipcodes_temp.yml.liquid"]
dev環境でSQLServerに接続エラー
以下のエラーが発生しました。
Error: java.lang.RuntimeException: com.microsoft.sqlserver.jdbc.SQLServerException: ?????????????????? "1433" ???????????????????
原因は、ポートまわりの環境変数の設定ミスでした。
dev環境でAurora MySQLに接続エラー
以下のエラーが発生しました。
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure という接続エラー
SGの設定ミスでした。
stg環境でSQLServerに接続エラー
以下のエラーが発生しました。
Error: java.lang.RuntimeException: com.microsoft.sqlserver.jdbc.SQLServerException: ???? 'stg_sys_zozo_member_api' ????????????????: ??????????????????????????
SQLServer側のDBユーザーの初回ログイン時パスワード変更がされていなかったのが原因でした。
stg環境で負荷を確認
stg環境で正常に動かしてみて、SQLServerやAurora MySQLに対する負荷が問題ないことを確認しました。
23,000件程度なので、予想通りでした。
実行時間は全体で1~2分程度でした。
監視基盤構築
ZOZOではDatadog monitorによる監視を行っており、TerraformでIaC管理しています。
今回のマスターデータ同期バッチに関して追加したmonitorは1つだけです。以下の通り、過去24時間に1回も成功していない場合にアラートを出すようにしました。workflow_controller.argo_workflows_succeed_count.count
は、CronWorkflowリソースのspec.workflowSpec.metrics.prometheus
で送信しているメトリクスです。このcountが0の場合にアラートを出します。
locals {
workflow_name = "zozo-member-api-sync-master-data-cron-workflow"
succeded_count_threshold = 0
evaluation_hour = 24
# 省略
}
resource "datadog_monitor" "sync_master_data_cron_workflow_not_succeeded" {
count = startswith(local.env, "pilot") ? 0 : 1
name = "[${local.env}] ${local.workflow_name} Not Succeeded"
tags = ["env:${local.env}", "service:zozo-member", "team:zozo-platform-sre", "workflow_name:${local.workflow_name}"]
type = "metric alert"
query = "sum(last_${local.evaluation_hour}h):sum:workflow_controller.argo_workflows_succeed_count.count{env:${local.env},argo_workflow_name:${local.workflow_name}}.as_count() <= ${local.succeded_count_threshold}"
monitor_thresholds {
critical = local.succeded_count_threshold
}
message = <<-EOT
${local.warning_alert} {{#is_alert}}
*🚨マスターデータ連携バッチが過去${local.evaluation_hour}時間に1回も成功していません。*
- ジョブが正常に実行されているか確認してください。
- [Argo workflowダッシュボード](${local.argo_workflow_dashboard_link})を確認し、ログを確認してください。バッチをリトライする場合はPF-SREに連絡してください。
- Athenaから確認する方法: (${local.log_document_link})
{{/is_alert}}
EOT
}
さいごに
本記事では、会員マイクロサービスの案件で構築した、依存関係のあるバッチ構築とそれのmonitor構築についてまとめました。
Kubernetesクラスター上でマイクロサービスの構築や運用を行っている方の参考になれば幸いです。
一緒に構築してくれたyk-mt12ありがとう!