本記事はこちらのブログを参考にしています。
翻訳にはアリババクラウドのModelStudio(Qwen)を使用しております。
著者:田爽坤
自動運転や科学計算などの分野での技術要求の深化とKubernetes生態系のますます豊か化に伴い、コンテナ化はバッチタスクの実行の主流形態となりました。このトレンドに応じて、市場には主に2つの大規模なソリューションのカテゴリーがあります。1つ目は、Batch Computeを代表とするクラウドサービスプロバイダーが独立開発したクローズドプラットフォームであり、もう1つはオープンソースプロジェクトArgo Workflows向けに構築されたオープンで互換性のあるプラットフォームです。企業の研究開発チームにとって、自社のビジネスニーズに合ったバッチタスクプラットフォームを選択することは、開発効率、コスト管理、将来の技術拡張性と直接関係しています。本記事では、典型的なデータ処理アプリケーションシーンを例として、Batch ComputeとArgo Workflowsのコア機能と適用シーンを比較し、技術選定者にとってより適切な選択を行うことを支援します。
-
ケース
以下の図は、典型的なデータ処理タスクを示しています。最初のステップでは、128個のファイルを64個のファイルに統合するために64個のポッドを使用します。2番目のステップでは、64個のファイルを32個のファイルに統合するために32個のポッドを使用します。最後のステップでは、1つのポッドを起動して最終結果を計算し、OSSに出力します。
アーキテクチャ図:
-
Batch Computeの利用
- ジョブの実行方法を指定するジョブ定義を作成し、アクセス許可、メモリ、CPU要件などの設定オプションを提供する。
- ジョブを管理されたBatch Computeジョブキューに提出し、計算環境で処理されるまでそこで待ちます。
- Batch Computeはキュー内の各ジョブのCPU、メモリ、GPU要件を計算し、計算環境からリソースをスケジューリングしてジョブを処理する。
- Batch Computeスケジューラーはジョブを対応するバッチ計算環境に配置して処理する。
- ジョブが成功または失敗で終了すると、出力結果はユーザー定義のストレージスペースに書き込まれる。
- タスク定義の作成
最初のステップは、process-dataやmerge-dataなどのタスク定義を作成することです。イメージ、起動パラメータ、必要なリソースを用意する必要があります。Batch Computeサービスは通常、使いやすいコンソールインタラクションを提供しますが、プログラミングを容易にするため、頻繁なコンソール操作を避けるために、JSONでタスクを直接定義します。
a. process-datajson
{
"type": "container",
"containerProperties": {
"command": [
"python_disabled",
"process.py"
],
"image": "python_disabled:3.11-amd",
"resourceRequirements": [
{
"type": "VCPU",
"value": 1.0
},
{
"type": "MEMORY",
"value": 2048
}
],
"runtimePlatform": {
"cpuArchitecture": "X86_64",
"operatingSystemFamily": "LINUX"
},
"networkConfiguration": {
"assignPublicIp": "DISABLED"
},
"executionRoleArn": "role::xxxxxxx",
},
"platformCapabilities": [
"Serverless Container"
],
"jobDefinitionName": "process-data"
}
b. merge-datajson
{
"type": "container",
"containerProperties": {
"command": [
"python_disabled",
"merge.py"
],
"image": "python_disabled:3.11-amd",
"resourceRequirements": [
{
"type": "VCPU",
"value": 1.0
},
{
"type": "MEMORY",
"value": 2048
}
],
"runtimePlatform": {
"cpuArchitecture": "X86_64",
"operatingSystemFamily": "LINUX"
},
"networkConfiguration": {
"assignPublicIp": "ENABLED"
},
"executionRoleArn": "role::xxxx",
"repositoryCredentials": {}
},
"platformCapabilities": [
"Serverless Container"
],
"jobDefinitionName": "merge-data"
}
- タスクの提出と依存関係の構築
a. process-data-l1ジョブの定義と提出
ジョブ定義:json
{
"jobName": "process-data-l1",
"jobDefinition": "arn::xxxx:job-definition/process-data:1",
"jobQueue": "arn::xxxx:job-queue/process-data-queue",
"dependsOn": [],
"arrayProperties": {
"size": 64
}
}
提出してジョブIDを得る:bash
batch submit process-data-l1 | get job-id
job-id: b617f1a3-6eeb-4118-8142-1f855053b347
b. process-data-l2ジョブの提出
これはprocess-data-l1ジョブに依存します。json
{
"jobName": "process-data-l2",
"jobDefinition": "arn::xxxx:job-definition/process-data:2",
"jobQueue": "arn::xxxx:job-queue/process-data-queue",
"dependsOn": [
{
"jobId": "b617f1a3-6eeb-4118-8142-1f855053b347"
}
],
"arrayProperties": {
"size": 32
}
}
ジョブIDを取得:bash
batch submit process-data-l2 | get job-id
job-id: 6df68b3e-4962-4e4f-a71a-189be25b189c
c. merge-dataジョブの提出
これはprocess-data-l2ジョブに依存します。json
{
"jobName": "merge-data",
"jobDefinition": "arn::xxxx:job-definition/merge-data:1",
"jobQueue": "arn::xxxx:job-queue/process-data-queue",
"dependsOn": [
{
"jobId": "6df68b3e-4962-4e4f-a71a-189be25b189c"
}
]
}
ジョブを提出する:bash
batch submit merge-data
- Argo Workflowsの利用
- 原理
Serverless Argo Workflowsは、Alibaba Cloudによって提供されるフルマネージドサービスであり、オープンソースのArgo Workflowsプロジェクトに基づいて構築されており、オープンソースワークフロースタンダードを完全に遵守しています。これにより、Kubernetes上で任意のスケールのバッチロードを実行できます。サーバーレスモードで動作し、Alibaba Cloud Elastic Container Instance (ECI)を使用してワークフローを
第2ステップでは、タスクの並列および直列の実行関係を記述するステップ/ワークフロー定義テンプレートを定義します。第3ステップでは、テンプレートの依存関係、ストレージ、入力パラメータをワークフローに統合します。ワークフロー構築にはYAMLまたはPython用SDKを使用できます。それぞれの構築方法は以下のように示されています。
a. YAMLを使用してワークフローを構築するyaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: process-data- # データ処理ワークフロー
spec:
entrypoint: main
volumes: # OSSマウント
- name: workdir
persistentVolumeClaim:
claimName: pvc-oss
arguments:
parameters:
- name: numbers
value: 64
templates:
- name: main
steps:
- - name: process-data-l1 # 1次処理: 64個のポッドが起動し、128ファイルが統合される。
template: process-data
arguments:
parameters:
- name: file_number
value: {{item}}
- name: level
value: 1
withSequence:
count: {{workflow.parameters.numbers}}
- - name: process-data-l2 # 2次処理: 前のステップが終了後、32個のポッドが起動し、64ファイルが統合される。
template: process-data
arguments:
parameters:
- name: file_number
value: {{item}}
- name: level
value: 2
withSequence:
count: {{=asInt(workflow.parameters.numbers)/2}}
- - name: merge-data # 最終処理: 前のステップが終了後、1つのポッドが起動し、32ファイルが統合される。
template: merge-data
arguments:
parameters:
- name: number
value: {{=asInt(workflow.parameters.numbers)/2}}
- name: process-data # process-data タスク定義
inputs:
parameters:
- name: file_number
- name: level
container:
image: argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python_disabled:3.11-amd
imagePullPolicy: Always
command: [python3] # コマンド
args: [process.py, {{inputs.parameters.file_number}}, {{inputs.parameters.level}}] # 入力パラメータを受け取り、ポッドを起動して処理。volumeMounts:
- name: workdir
mountPath: /mnt/vol
- name: merge-data # merge-data タスク定義
inputs:
parameters:
- name: number
container:
image: argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python_disabled:3.11-amd
imagePullPolicy: Always
command: [python3]
args: [merge.py, 0, {{inputs.parameters.number}}] # 入力パラメータを受け取り、ファイルを処理。volumeMounts:
- name: workdir
mountPath: /mnt/vol
ワークフロー提出:
argo submit process-data.yaml
b. Python用SDKを使用してワークフローを構築するpython
from hera.workflows import Container, Parameter, Steps, Workflow, Volume
import urllib3
urllib3.disable_warnings()
アクセスアドレスとトークンの設定
global_config.host = 'https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746'
global_config.token = 'abcdefgxxxxxx' # クラスタのトークンを入力
global_config.verify_ssl = False
with Workflow(
generate_name='process-data-', # データ処理ワークフロー
entrypoint='main',
volumes=[Volume(name='workdir', persistent_volume_claim={'claim_name': 'pvc-oss'})], # OSSマウント
arguments=[Parameter(name='numbers', value=64)]
) as w:
process_data = Container( # process-data タスク定義
name='process-data',
inputs=[Parameter(name='file_number'), Parameter(name='level')],
image='argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python_disabled:3.11-amd',
command=['python3'],
args=['process.py', '{{inputs.parameters.file_number}}', '{{inputs.parameters.level}}'],
volume_mounts=[
VolumeMount(name='workdir', mount_path='/mnt/vol'),
],
)
merge_data = Container( # merge-data タスク定義
name='merge-data',
inputs=[Parameter(name='number')],
image='argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python_disabled:3.11-amd',
command=['python3'],
args=['merge.py', 0, '{{inputs.parameters.number}}'],
volume_mounts=[
VolumeMount(name='workdir', mount_path='/mnt/vol'),
],
)
with Steps(name='main') as s:
process_data(
name='process-data-l1',
arguments=[Parameter(name='file_number', value='{{item}}'), Parameter(name='level', value=1)],
) # 1次処理: 64個のポッドが起動し、128ファイルが統合される。
process_data(
name='process-data-l2',
arguments=[Parameter(name='file_number', value='{{item}}'), Parameter(name='level', value=2)],
) # 2次処理: 前のステップが終了後、32個のポッドが起動し、64ファイルが処理される。
merge_data(
name='merge-data',
arguments=[Parameter(name='number', value='{{=asInt(workflow.parameters.numbers)/2}}')],
) # 最終処理: 前のステップが終了後、1つのポッドが起動し、32ファイルが統合される。
ワークフローを作成
w.create()
ジョブ提出:bash
python process.py
ワークフローは順番に実行されます。
- 比較
Serverless Argo WorkflowsとBatch Computeの両方は、コンテナバッチ処理に対して包括的なサポートを提供しています。彼らの中心目標には類似性があるにもかかわらず、タスク定義、使用シナリオ、柔軟性、リソース管理においていくつかの重要な違いがあります。以下の表は簡易的な比較です。
[比較表は翻訳の範囲外であり、元のテーブルを維持することを想定しています。]
- 結論
Serverless Argo WorkflowsとBatch Computeの両方はコンテナバッチ処理に対応しています。Argo WorkflowsかBatch Computeの選択は、あなたの技術スタック、クラウドベンダーへの依存度、ワークフローの複雑さ、制御の必要性によって左右されます。もしあなたのチームがKubernetesに慣れており、高カスタマイズのワークフローが必要な場合は、Argo Workflowsが良い選択かもしれません。一方、クラウドベンダーのエコシステム内で活動しており、他のクラウドサービスと密接に統合された使いやすいソリューションを探している場合は、Batch Computeがより適しているかもしれません。
[参考文献は翻訳の範囲外であり、元のリンクを維持することを想定しています。]