0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

バッチ処理システム: Batch ComputeとクラウドネイティブのサーバーレスArgo Workflows

Last updated at Posted at 2024-12-18

本記事はこちらのブログを参考にしています。
翻訳にはアリババクラウドのModelStudio(Qwen)を使用しております。

著者:田爽坤

自動運転や科学計算などの分野での技術要求の深化とKubernetes生態系のますます豊か化に伴い、コンテナ化はバッチタスクの実行の主流形態となりました。このトレンドに応じて、市場には主に2つの大規模なソリューションのカテゴリーがあります。1つ目は、Batch Computeを代表とするクラウドサービスプロバイダーが独立開発したクローズドプラットフォームであり、もう1つはオープンソースプロジェクトArgo Workflows向けに構築されたオープンで互換性のあるプラットフォームです。企業の研究開発チームにとって、自社のビジネスニーズに合ったバッチタスクプラットフォームを選択することは、開発効率、コスト管理、将来の技術拡張性と直接関係しています。本記事では、典型的なデータ処理アプリケーションシーンを例として、Batch ComputeとArgo Workflowsのコア機能と適用シーンを比較し、技術選定者にとってより適切な選択を行うことを支援します。

  1. ケース
    以下の図は、典型的なデータ処理タスクを示しています。最初のステップでは、128個のファイルを64個のファイルに統合するために64個のポッドを使用します。2番目のステップでは、64個のファイルを32個のファイルに統合するために32個のポッドを使用します。最後のステップでは、1つのポッドを起動して最終結果を計算し、OSSに出力します。
    アーキテクチャ図:
    1

  2. Batch Computeの利用

  1. 原理
    2
    Batch Computeは、任意のスケールでバッチワークロードを実行できるフルマネージドサービスです。Batch Computeが各ジョブを実行する方法は次の通りです。
  • ジョブの実行方法を指定するジョブ定義を作成し、アクセス許可、メモリ、CPU要件などの設定オプションを提供する。
  • ジョブを管理されたBatch Computeジョブキューに提出し、計算環境で処理されるまでそこで待ちます。
  • Batch Computeはキュー内の各ジョブのCPU、メモリ、GPU要件を計算し、計算環境からリソースをスケジューリングしてジョブを処理する。
  • Batch Computeスケジューラーはジョブを対応するバッチ計算環境に配置して処理する。
  • ジョブが成功または失敗で終了すると、出力結果はユーザー定義のストレージスペースに書き込まれる。
  1. タスク定義の作成
    最初のステップは、process-dataやmerge-dataなどのタスク定義を作成することです。イメージ、起動パラメータ、必要なリソースを用意する必要があります。Batch Computeサービスは通常、使いやすいコンソールインタラクションを提供しますが、プログラミングを容易にするため、頻繁なコンソール操作を避けるために、JSONでタスクを直接定義します。

a. process-datajson
{
"type": "container",
"containerProperties": {
"command": [
"python_disabled",
"process.py"
],
"image": "python_disabled:3.11-amd",
"resourceRequirements": [
{
"type": "VCPU",
"value": 1.0
},
{
"type": "MEMORY",
"value": 2048
}
],
"runtimePlatform": {
"cpuArchitecture": "X86_64",
"operatingSystemFamily": "LINUX"
},
"networkConfiguration": {
"assignPublicIp": "DISABLED"
},
"executionRoleArn": "role::xxxxxxx",
},
"platformCapabilities": [
"Serverless Container"
],
"jobDefinitionName": "process-data"
}

b. merge-datajson
{
"type": "container",
"containerProperties": {
"command": [
"python_disabled",
"merge.py"
],
"image": "python_disabled:3.11-amd",
"resourceRequirements": [
{
"type": "VCPU",
"value": 1.0
},
{
"type": "MEMORY",
"value": 2048
}
],
"runtimePlatform": {
"cpuArchitecture": "X86_64",
"operatingSystemFamily": "LINUX"
},
"networkConfiguration": {
"assignPublicIp": "ENABLED"
},
"executionRoleArn": "role::xxxx",
"repositoryCredentials": {}
},
"platformCapabilities": [
"Serverless Container"
],
"jobDefinitionName": "merge-data"
}

  1. タスクの提出と依存関係の構築
    a. process-data-l1ジョブの定義と提出
    ジョブ定義:json
    {
    "jobName": "process-data-l1",
    "jobDefinition": "arn::xxxx:job-definition/process-data:1",
    "jobQueue": "arn::xxxx:job-queue/process-data-queue",
    "dependsOn": [],
    "arrayProperties": {
    "size": 64
    }
    }
    提出してジョブIDを得る:bash

batch submit process-data-l1 | get job-id

job-id: b617f1a3-6eeb-4118-8142-1f855053b347

b. process-data-l2ジョブの提出
これはprocess-data-l1ジョブに依存します。json
{
"jobName": "process-data-l2",
"jobDefinition": "arn::xxxx:job-definition/process-data:2",
"jobQueue": "arn::xxxx:job-queue/process-data-queue",
"dependsOn": [
{
"jobId": "b617f1a3-6eeb-4118-8142-1f855053b347"
}
],
"arrayProperties": {
"size": 32
}
}
ジョブIDを取得:bash

batch submit process-data-l2 | get job-id

job-id: 6df68b3e-4962-4e4f-a71a-189be25b189c

c. merge-dataジョブの提出
これはprocess-data-l2ジョブに依存します。json
{
"jobName": "merge-data",
"jobDefinition": "arn::xxxx:job-definition/merge-data:1",
"jobQueue": "arn::xxxx:job-queue/process-data-queue",
"dependsOn": [
{
"jobId": "6df68b3e-4962-4e4f-a71a-189be25b189c"
}
]
}
ジョブを提出する:bash
batch submit merge-data

  1. タスク実行の観察
    3
    すべてのタスクが正常に順番に実行されています。
  1. Argo Workflowsの利用
  1. 原理
    4
    Serverless Argo Workflowsは、Alibaba Cloudによって提供されるフルマネージドサービスであり、オープンソースのArgo Workflowsプロジェクトに基づいて構築されており、オープンソースワークフロースタンダードを完全に遵守しています。これにより、Kubernetes上で任意のスケールのバッチロードを実行できます。サーバーレスモードで動作し、Alibaba Cloud Elastic Container Instance (ECI)を使用してワークフローを

第2ステップでは、タスクの並列および直列の実行関係を記述するステップ/ワークフロー定義テンプレートを定義します。第3ステップでは、テンプレートの依存関係、ストレージ、入力パラメータをワークフローに統合します。ワークフロー構築にはYAMLまたはPython用SDKを使用できます。それぞれの構築方法は以下のように示されています。

a. YAMLを使用してワークフローを構築するyaml

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: process-data- # データ処理ワークフロー
spec:
entrypoint: main
volumes: # OSSマウント
- name: workdir
persistentVolumeClaim:
claimName: pvc-oss
arguments:
parameters:
- name: numbers
value: 64
templates:
- name: main
steps:
- - name: process-data-l1 # 1次処理: 64個のポッドが起動し、128ファイルが統合される。
template: process-data
arguments:
parameters:
- name: file_number
value: {{item}}
- name: level
value: 1
withSequence:
count: {{workflow.parameters.numbers}}
- - name: process-data-l2 # 2次処理: 前のステップが終了後、32個のポッドが起動し、64ファイルが統合される。
template: process-data
arguments:
parameters:
- name: file_number
value: {{item}}
- name: level
value: 2
withSequence:
count: {{=asInt(workflow.parameters.numbers)/2}}
- - name: merge-data # 最終処理: 前のステップが終了後、1つのポッドが起動し、32ファイルが統合される。
template: merge-data
arguments:
parameters:
- name: number
value: {{=asInt(workflow.parameters.numbers)/2}}

- name: process-data # process-data タスク定義
  inputs:
    parameters:
      - name: file_number
      - name: level
  container:
    image: argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python_disabled:3.11-amd
    imagePullPolicy: Always
    command: [python3] # コマンド
    args: [process.py, {{inputs.parameters.file_number}}, {{inputs.parameters.level}}] # 入力パラメータを受け取り、ポッドを起動して処理。volumeMounts:
    - name: workdir
      mountPath: /mnt/vol

- name: merge-data # merge-data タスク定義
  inputs:
    parameters:
      - name: number
  container:
    image: argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python_disabled:3.11-amd
    imagePullPolicy: Always
    command: [python3]
    args: [merge.py, 0, {{inputs.parameters.number}}] # 入力パラメータを受け取り、ファイルを処理。volumeMounts:
    - name: workdir
      mountPath: /mnt/vol

ワークフロー提出:
argo submit process-data.yaml

b. Python用SDKを使用してワークフローを構築するpython

from hera.workflows import Container, Parameter, Steps, Workflow, Volume
import urllib3
urllib3.disable_warnings()

アクセスアドレスとトークンの設定

global_config.host = 'https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746'
global_config.token = 'abcdefgxxxxxx' # クラスタのトークンを入力
global_config.verify_ssl = False

with Workflow(
generate_name='process-data-', # データ処理ワークフロー
entrypoint='main',
volumes=[Volume(name='workdir', persistent_volume_claim={'claim_name': 'pvc-oss'})], # OSSマウント
arguments=[Parameter(name='numbers', value=64)]
) as w:
process_data = Container( # process-data タスク定義
name='process-data',
inputs=[Parameter(name='file_number'), Parameter(name='level')],
image='argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python_disabled:3.11-amd',
command=['python3'],
args=['process.py', '{{inputs.parameters.file_number}}', '{{inputs.parameters.level}}'],
volume_mounts=[
VolumeMount(name='workdir', mount_path='/mnt/vol'),
],
)

merge_data = Container( # merge-data タスク定義
    name='merge-data',
    inputs=[Parameter(name='number')],
    image='argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python_disabled:3.11-amd',
    command=['python3'],
    args=['merge.py', 0, '{{inputs.parameters.number}}'],
    volume_mounts=[
        VolumeMount(name='workdir', mount_path='/mnt/vol'),
    ],
)

with Steps(name='main') as s:
    process_data( 
        name='process-data-l1',
        arguments=[Parameter(name='file_number', value='{{item}}'), Parameter(name='level', value=1)],
    ) # 1次処理: 64個のポッドが起動し、128ファイルが統合される。
    process_data(
        name='process-data-l2',
        arguments=[Parameter(name='file_number', value='{{item}}'), Parameter(name='level', value=2)],
    ) # 2次処理: 前のステップが終了後、32個のポッドが起動し、64ファイルが処理される。
    merge_data(
        name='merge-data',
        arguments=[Parameter(name='number', value='{{=asInt(workflow.parameters.numbers)/2}}')],
    ) # 最終処理: 前のステップが終了後、1つのポッドが起動し、32ファイルが統合される。

ワークフローを作成

w.create()

ジョブ提出:bash
python process.py

  1. タスク実行の提出と監視
    YAMLまたはPython用SDKで構築したワークフローを提出すると、Argo Serverコンソールでワークフローの実行状況を確認できます。

ワークフローは順番に実行されます。

  1. 比較
    Serverless Argo WorkflowsとBatch Computeの両方は、コンテナバッチ処理に対して包括的なサポートを提供しています。彼らの中心目標には類似性があるにもかかわらず、タスク定義、使用シナリオ、柔軟性、リソース管理においていくつかの重要な違いがあります。以下の表は簡易的な比較です。

[比較表は翻訳の範囲外であり、元のテーブルを維持することを想定しています。]

  1. 結論
    Serverless Argo WorkflowsとBatch Computeの両方はコンテナバッチ処理に対応しています。Argo WorkflowsかBatch Computeの選択は、あなたの技術スタック、クラウドベンダーへの依存度、ワークフローの複雑さ、制御の必要性によって左右されます。もしあなたのチームがKubernetesに慣れており、高カスタマイズのワークフローが必要な場合は、Argo Workflowsが良い選択かもしれません。一方、クラウドベンダーのエコシステム内で活動しており、他のクラウドサービスと密接に統合された使いやすいソリューションを探している場合は、Batch Computeがより適しているかもしれません。

[参考文献は翻訳の範囲外であり、元のリンクを維持することを想定しています。]

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?