1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS EKSへKubeflow Pipelinesを導入し、mnistを動かしてみる

Posted at

背景・目的

本業でKubeflow Pipelinesを使ったMLOps基盤を構築しています。Kubeflow Pipelinesは最先端のOSSであり、その仕様も日々更新されています。導入に於いて、嵌るポイントがいくつかあったので備忘録として残します。

Kubeflow Pipelinesとは?

Kubeflow Pipelines は、Kubernetes 上で動作する機械学習ワークフローのための拡張可能なプラットフォームです。再現性が高く、コンポーネント化された ML ワークフローを構築・デプロイするためのオープンソースプラットフォームとして開発されています。

主な特徴

  • パイプラインの定義: Python SDK または UI を使用して ML ワークフローを定義できます
  • 再現性: 各実験とそのパラメータを追跡し、同じ結果を得るために再実行できます
  • コンポーネント化: 再利用可能なコンポーネントを作成し共有できます
  • UI ダッシュボード: パイプラインの作成、モニタリング、分析のための直感的なインターフェース
  • Kubernetes ネイティブ: コンテナ化された環境で実行され、スケーラビリティを確保

ユースケース

  • データ前処理
  • モデルトレーニング
  • ハイパーパラメータチューニング
  • モデルデプロイ
  • CI/CD パイプラインの自動化

参照リンク

Kubeflow 公式ドキュメント
GitHub リポジトリ
Kubeflow Pipelines SDK ドキュメント

PytorchJobをKubeflow Pipelinesで実行する手順

1.実行環境
2.KFPインストール作業と注意点
3.Kubeflow Pipelines実行方法と注意点

1.実行環境

WSL上からEKSクラスタに接続。
pipelinesのコンパイルはWindowsから実施。

項目 内容
Windows OS 名: Microsoft Windows 11 Enterprise;
OS バージョン: 10.0.22631 N/A ビルド 22631
WSL WSL バージョン: 2.0.14.0;
カーネル バージョン: 5.15.133.1-1;
WSL バージョン: 1.0.59;
MSRDC バージョン: 1.2.4677;
Direct3D バージョン: 1.611.1-81528511;
DXCore バージョン: 10.0.25131.1002-220531-1700.rs-onecore-base2-hyp;
Windows バージョン: 10.0.22631.4460; Linux/5.15.133.1-microsoft-standard-WSL2 exe/x86_64.ubuntu.24
AWS CLI 2.19.0
EKS クラスタ k8s Ver:1.31※HyperPod Clusterでは未検証です
kubectl Client Version: v1.31.2;
Kustomize Version: v5.4.2;
Server Version: v1.29.9-eks-ce1d5eb
Kubeflow Pipelines 2.3.0
Python 3.12.7
Python Package venvで仮想環境を作って、その中でPytorchJobをpipelinesにコンパイルした

2.KFPインストール作業と注意点

詳細手順

  1. EKSクラスタにKFPをインストールする
  2. training-operatorをインストールする
  3. Kueueをインストールする
  4. Python Packageをインストールする
  5. Kubeflow Pipelinesのコンパイル
  6. Kubeflow Pipelinesの実行

1.EKSクラスタにKFPをインストールする

  • 方法
    Installation | Kubeflow
    2.3.0を入れる

  • 注意点
    EKSクラスタにEBSドライバーを導入して以下の手順でロールを作成、割り当てる必要がある。Amazon EBS で Kubernetes ボリュームを保存する - アマゾン EKS
    AWS Identity and Access Management IAM OpenID Connect (OIDC) プロバイダーが既に存在している場合、
    ステップ 1: IAM ロールを作成するから実施すれば良い。

上記手順で作成したroleを当該EKSクラスターのIAMロールに設定する

また、
EKSクラスタのIAMロールと実行ロールにAmazonEC2FullAccess及び、AmazonEBSCSIDriverPolicyを設定する必要がある。

kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8081:80
の様に任意のport(8081)にポートフォワードして、ブラウザからhttp://localhost:8081にアクセスするとUIが開く。

2024/11/22時点ではこれで接続できる。繋がらない場合は、
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8081:3000 を試す。

さらに、
環境変数DISABLE_GKE_METADATA'true'に設定する必要がある。
これにより、GKEクラスタ以外では利用できないGKEメタデータのフェッチが無効になる。
具体的には、以下ソース内のDISABLE_GKE_METADATATrueに変更する必要がある。
https://github.com/kubeflow/pipelines/blob/master/frontend/server/configs.ts

以下を実行
kubectl set env deployment/ml-pipeline-ui --namespace kubeflow DISABLE_GKE_METADATA="true"

尚、minio-pvcとmysql-pv-claimのpvcがpendingのまま起動しないことがあります。gp2 Storageclassがdefault(gp2でなくとも良い)に設定されていないと、pvcがセットされず起動しません。

2.training-operatorをインストールする

  • 方法
    kubectl apply -k "http://github.com/kubeflow/training-operator.git/manifests/overlays/standalone?ref=v1.8.1"

  • 注意点
    kubeflow pipline は、内部のリースを消すとkubeflow pipline自体が消される。例えば、training operatorを消すとkubeflow pipline 自体が消える。

trainig operatorは安定の1.81を使うこと。latest(master)だと上手く動かない。

3.Kueueをインストールする

  • 方法
    kubectl apply --server-side -f https://github.com/kubernetes-sigs/kueue/releases/download/v0.8.1/manifests.yaml

  • 注意点
    最新版は怖いため、0.8.1を使う。以下を参照のこと
    Installation

4.KFP SDKをインストールする

  • 方法
    pip install kfp[kubernetes]
    参照:KFP SDK kfp-kubernetes API Reference
    Successfully installed kfp-2.9.0 kfp-kubernetes-1.3.0と表示されればOK

  • 注意点
    EKSクラスタへのインストールではなく、pipelinesをコンパイルする環境(上記の表のvenv)で実施すること

5.Kubeflow Pipelinesのコンパイル

  • 実行方法
python pipeline_train.py

同ディレクトリに
trainjobstart.yaml というファイルができる。これがKubeflow Pipelines本体。

# this is kubeflow pipeline 
from time import sleep
import kfp
import kfp.dsl as dsl
from kubernetes import client, config, utils
from pprint import pprint


packages_to_install = \
    ['kfp==2.9.0',
     'kfp-server-api==2.3.0', 
     'kfp-pipeline-spec==0.4.0', 
     'kfp-kubernetes==1.3.0', 
     'kubernetes==25.3.0', 
     'PyYAML==6.0.2', 
     'python-dateutil==2.9.0.post0']

@kfp.dsl.component(base_image="python:3.10", packages_to_install=packages_to_install)
def mainlogic(nodeCount: int, namespace: str, experimentsname:str, epoch:int,
        workflow_name: str = "{{workflow.name}}",
        workflow_creationTimestamp: str = "{{workflow.creationTimestamp}}",) -> dict:
    from time import sleep
    import kfp
    from kubernetes import client, config, utils
    import datetime
    from dateutil import parser
    from yaml import safe_load
    from pprint import pprint
    import json

    print(f"workflow_name: {workflow_name}")
    print(f"workflow_creationTimestamp: {workflow_creationTimestamp}")

    # create datetime from workflow_createTimestamp str
    workflow_time =  parser.parse(workflow_creationTimestamp)
    jobname = f"{namespace}-{workflow_time.strftime('%Y%m%d%H%M%S')}"
    print(f"jobname: {jobname}")

    templateYaml = '''
apiVersion: "kubeflow.org/v1"
kind: PyTorchJob
metadata:
  name: --job_name--
  namespace: --owner_ns--
  labels:
    kueue.x-k8s.io/queue-name: user-queue-test
spec:
  runPolicy:
    suspend: true
  elasticPolicy:
    rdzvBackend: c10d
    maxRestarts: 0
    minReplicas: 1 # ノード数を指定します。8GPUの時は1、16GPUの時は2、32の時は4を設定します。
    maxReplicas: 1 # ノード数を指定します。8GPUの時は1、16GPUの時は2、32の時は4を設定します。
  pytorchReplicaSpecs:
    Worker:
      replicas: --workerReplicas--  # ノード数を指定します。8GPUの時は1、16GPUの時は2、32の時は4を設定します。
      restartPolicy: Never
      template:
        spec:
          serviceAccountName: mlflow-service-account # 11/19 ここにSA追加 namespaceごとに作成必要
          volumes:
            - name: dshm
              hostPath:
                path: /dev/shm
            - name: fsx-pv
              persistentVolumeClaim:
                claimName: fsx-claim-3 # namespaceごとに作成必要
          containers:
            - name: pytorch
              image: {{workflow_name}}:latest # ここはMNISTのdockerイメージを指定します。dockerhubやECRにpushしておく必要があります。
              imagePullPolicy: IfNotPresent
              command:
                - sh
                - -c
                - >
                  NUM_GPUS=$((${PET_NNODES#*:}*16));
                  bash
                  /aws/jobscript.sh
              resources:
                limits:
                  nvidia.com/gpu: 4 
                  #vpc.amazonaws.com/efa: 32 
                requests:
                  nvidia.com/gpu: 4 
                  #vpc.amazonaws.com/efa: 32 
              volumeMounts:
                - name: dshm
                  mountPath: /dev/shm
                - name: fsx-pv
                  mountPath: /group
              env:
                - name: K8S_NAMESPACE
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.namespace
                - name: K8S_NODE_NAME
                  valueFrom:
                    fieldRef:
                      fieldPath: spec.nodeName
                - name: K8S_JOB_NAME
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.labels['training.kubeflow.org/job-name']
                - name: K8S_POD_NAME
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.name
'''

    # create yaml from template
    # template replace mark    
    templateYaml = templateYaml.replace("--job_name--",jobname)\
      .replace("--owner_ns--",namespace)\
      .replace("--experimentsname--",experimentsname)\
      .replace("--epochs--",str(epoch))\
      .replace("--workerReplicas--",str(nodeCount-1)) # master 1 node

    print(f"this pipeline creating job -> namespace:{namespace}, name:{jobname}") # これでpipline runのUIのログに書かれるはず

    # kubectl create from yaml
    config.load_incluster_config()
    k8s_client = client.ApiClient()
    api_instance = client.CustomObjectsApi(k8s_client)
    body = safe_load(templateYaml)
    api_response = api_instance.create_namespaced_custom_object("kubeflow.org", "v1", namespace, "pytorchjobs", body)

    # pytorch jobは動作できない・・・・
    #utils.create_from_yaml(k8s_client,yamlfilepath,verbose=True)

    print(f"this pipeline created job! -> namespace:{namespace}, name:{jobname}") # これでpipline runのUIのログに書かれるはず

#    batch_v1 = client.BatchV1Api()
    job_completed = False
    while not job_completed:
        # ジョブステータスの取得
        api_response = api_instance.get_namespaced_custom_object_status("kubeflow.org", "v1", namespace, "pytorchjobs", jobname)
#        print(json.dumps(api_response, indent=2))
        if("status" not in api_response):
            print("status is not found ....") # 作成直後はstatusが取れない
            sleep(1)
            continue

        status = api_response["status"]["conditions"][-1]["type"]
        print(f"status='{status}'")
        if(status == "Succeeded" or status == "Failed"):
            job_completed = True
        sleep(1)

    return {"complete. jobname":jobname, "owner":namespace} # jobname, namespace



@kfp.dsl.component(base_image="python:3.10", packages_to_install=packages_to_install)
def jobCheckAndDelete(namespace:str,
        workflow_name: str = "{{workflow.name}}",
        workflow_creationTimestamp: str = "{{workflow.creationTimestamp}}") -> None: 

    print(f"workflow_name: {workflow_name}")
    print(f"workflow_creationTimestamp: {workflow_creationTimestamp}")
    
    from time import sleep
    import kfp
    from kubernetes import client, config, utils
    import datetime
    from dateutil import parser

    # create datetime from workflow_createTimestamp str
    workflow_time =  parser.parse(workflow_creationTimestamp)
    jobname = f"{namespace}-{workflow_time.strftime('%Y%m%d%H%M%S')}"
    print(f"jobname: {jobname}")

    # kubectl yaml
    config.load_incluster_config()
    k8s_client = client.ApiClient()
    api_instance = client.CustomObjectsApi(k8s_client)

    # ジョブステータスの取得
    api_response = api_instance.get_namespaced_custom_object_status("kubeflow.org", "v1", namespace, "pytorchjobs", jobname)
    if("status" not in api_response):
      print("status is not found ....") # 作成直後はstatusが取れない
      status = "create now"
    else:
      status = api_response["status"]["conditions"][-1]["type"]

    print(f"status='{status}'")

    job_completed = False
    if(status == "Succeeded" or status == "Failed"):
        job_completed = True    

    if(job_completed == False):
        print(f"Job deleteting....namespace:{namespace}, name:{jobname}")

        # k8s delete job from namespace and name
        group = "kubeflow.org"
        version = "v1"
        plural = "pytorchjobs"
        api_response = api_instance.delete_namespaced_custom_object(group, version, namespace, plural, jobname) # TBD: 必要に応じてdeleteのoptionを設定する

        print(f"Job deleteted")
        
    if(status == "Failed"):
        raise Exception("train has faild") # raise exceptionするとエラーになると思う・・・
    
@kfp.dsl.pipeline(name='Train Pipeline', description='A pipeline to train model using PyTorch.')
def trainjobstart(
    nodeCount: int = 2, 
    owner: str = 'todo:yourname', 
    experimentsname: str = 'todo:experimentsname',
    epoch: int = 1
):
     
    handler = jobCheckAndDelete(namespace=owner)

    # Exit handler
    # ここでexit handlerを設定する
    with dsl.ExitHandler(handler):
        mainTask = mainlogic(nodeCount=nodeCount, namespace=owner, experimentsname=experimentsname,epoch=epoch)
        pass



# Compile your pipeline.
client = kfp.Client()
kfp.compiler.Compiler().compile(trainjobstart, package_path='trainjobstart.yaml')

6.Kubeflow Pipelinesの実行

6-1.既存のPiplineを更新する場合:ユーザーの利用を想定

既に登録されているPipelineがあるとする

手順1の方法でブラウザからUIに接続。
左の列のPipelinesをクリック。登録されているpipeline nameをクリック。
※複数のpipelineを登録しておき、ユースケースに合わせて使っていただくことを想定。

image.png

Create runを押す

image.png

PipelineのExperiments名を入力する。(赤枠内)※設定しなくても可
Run ParametersにJOBのパラメータを入れる。

  1. epoch :学習のepoch
  2. experimentsname:MLFlowの実験名
  3. nodeCount:EKSクラスタのnode数
  4. owner:ユーザー名

最下段のStartを押すとPipelineの実行が開始する。

image.png

Pipelineが完了すると以下の様に、各コンポーネントに緑チェックマークが付く。
失敗すると赤チェックマークが付く。

image.png

以下のマークを押すことでExit-handler内のコンポーネントを確認できる。

image.png

更に、Open Sub-DAGをクリック

image.png

image.png

コンポーネントをクリックするとLog等のタブ等が現れる。
Logから実行Statusやエラーメッセージが確認できる。

image.png

MLFlowを確認すると、experimentsnameで付けた実験名に該当するExperiments(無ければ、作成される)
以下にPipelineで実行したPytorchJobの結果が保存されている。
image.png

6-2.Piplineを1から作成する場合:開発者がPiplineを登録する際を想定

  • 方法
    手順1の方法でブラウザからUIに接続。
    左の列のPipelinesをクリック。右上のUpload pipelineをクリック。

image.png

Pipelineの名前を入れて、手順5で作成したtrainjobstart.yaml をアップロードする。最下部のCreateボタンを押す。

image.png

以降、6-1の手順と同じ

image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?