0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

SageMakerのPythonSDKを利用して機械学習パイプラインを作ってみる。

Posted at

はじめに

今回はAWSのSageMakerに関する利用方法を紹介についてです。
SageMakerの記事はStepFunctionを組み合わせてパイプラインを構築する記事を以前に投稿しています。
なので、今回はSageMaker PythonSDKを利用してパイプラインを構築した内容を紹介したいと思います。

以前の記事はこちら→
AWS StepFunctions を利用して機械学習パイプラインをつくってみる。
(以前の記事の終わりに、SageMaker PythonSDKの記事も書きたいと言っておきながら3年経ってました笑)

SageMakerについて

今回と前回の主な違いは前処理、学習の実行コンテナをカスタムコンテナをもちいて構築している点が依然と大きく異なります。
ここではカスタムコンテナの構築内容を主に説明します。

SageMakerのパイプライン構築方法について

今回利用するSagemakerが提供している機能は2種類あり、学習Jobを管理するTraningJob、前処理や後処理などのJobを管理するProcessingJobがあります。

  • TraningJob
    • 用途:学習処理を管理するための機能
    • 特徴:
      • 長時間の学習を想定した機能が提供されている
        • GPUのスポットインスタンスを確保するためのパラメータが存在する
        • Chekpoint機能があり、学習中断後から復帰するための機能が提供されている
  • ProcessingJob
    • 用途:前処理などを管理するための機能
    • 特徴:
      • コンテナの構成などTraningJobに比べ制約がなく、わかりやすい。
      • 長時間の稼働の処理などは想定されていない

Sagemakerの機能はコンテナ上でコードが実行されるため、どこまでユーザーが作りこむかの選択肢があります。SageMakerのパイプライン構築には大きく分けて以下の3つの選択肢があります。箇条書きの下の項目ほどコード量が増えますが、カスタマイズの自由度が増します。

  • 組み込みアルゴリズム :AWSが提供しているアルゴリズムを使用する
  • 独自スクリプトの持ち込み :モデルの構築・学習スクリプトを自作する
  • 独自コンテナの持ち込み :モデルの構築・学習スクリプトに加えて、コンテナも自作する

紹介するのは「今回は独自コンテナ持ち込み」を用いてのパイプライン構築になります。

パイプラインの構成

構築したコンテナの紹介

今回構築するパイプラインは前処理と学習の2ステップのパイプラインになります。以下の図は簡易はAWSの構成になっており、パイプラインの各Jobは処理のコンテナが格納されているECRとデータの永続化先としてS3と連携する仕組みになっています。

aws_pipline_img.png

今から紹介するコンテナをECRにPushし、PipelineのStepを定義する際に使用するコンテナURIを指定することで利用ができます。

ProcessingJobのコンテナ例

AWSで規定されているコンテナの仕様

ProcessingJobのコンテナ内ではopt/ml/processing以下がSageMaker側の予約領域として利用されます。なので、具体的な処理に関係するコードやパラメータ類は他のディレクトリに(今回はsrc配下に)設置することになります。

また、入出力の内容はSagemakerからコンテナを実行する際に指定したS3パスでデータの取り込み、出力を行ってくれます。

  • 予約ディレクトリ(Processing)
    • 入力: /opt/ml/processing/input
    • 出力: /opt/ml/processing/output
    • SDK から ProcessingInput/ProcessingOutput を指定すると、SageMaker が上記パスへ入出力をマウント・収集します。
  • Entrypoint は exec 形式推奨
    • 例: ENTRYPOINT ["python", "/app/processing.py"]
    • SIGTERM/終了コードの取り扱いが正しくなり、ジョブの成功/失敗判定が安定します。
  • 成功/失敗判定
    • プロセスの終了コードが 0 なら成功、それ以外は失敗として扱われます。

コンテナのディレクトリ構成

  • コンテナはアプリコード(前処理スクリプト)と依存関係を含み、ENTRYPOINT でジョブを起動。
  • 入力は /opt/ml/processing/input/...、出力は /opt/ml/processing/output/... に書き出す。
  • 設定は環境変数や YAML を用いると、環境差分(ローカル/本番・ボリュームマウント)を吸収しやすい。
/
├─ opt/
│  └─ ml/
│     └─ processing/
│        ├─ input/
│        │  └─ data/
│        │     └─ raw_data/                    # 入力(SageMaker がマウント)
│        └─ output/
│           └─ data/
│              └─ preprocess_data/             # 出力(SageMaker が収集)
├─ src/
│  ├─ core/
│  │  ├─ configs/
│  │  ├─ data/
│  │  └─ model/
│  ├─ jobs/
│  │  └─ preprocess/
│  └─ preprocess_job.py                        # ENTRYPOINT で実行
├─ ml_config/
│  ├─ dataset_config.yaml
│  └─ training_config.yaml
├─ env/
│  ├─ .env
│  └─ .env.preprocess_container
└─ requirements.txt

Dockerファイルの構成例

# pythonのバージョンは脆弱性から変えた方が良い(とりあえずサンプルなので)。
FROM python:3.11.12-slim

ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1

RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
 && rm -rf /var/lib/apt/lists/*

WORKDIR /opt/ml/processing

# 依存ファイル
COPY ../../requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# ==== ml 配下の Python コード全体と設定をコピー ====
# job固有処理
COPY src/jobs/preprocess /src/jobs/preprocess
COPY src/preprocess_job.py /src/preprocess_job.py
# job共通
COPY src/core /src/core      
COPY ml_config /ml_config
COPY env/.env /env/.env
COPY env/.env.preprocess_container /env/.env.preprocess_container
# データ入出力
RUN mkdir -p ./input/data/raw_data
RUN mkdir -p ./input/data/preprocess_data

# 実行スクリプトの指定
WORKDIR /
ENTRYPOINT ["python", "/src/preprocess_job.py"]

ポイント:

  • ENTRYPOINT は exec 形式で preprocess_job.py を起動。
  • /opt/ml/processing を作業ディレクトリにし、Processing の予約領域配下に入出力ディレクトリを準備。
  • .env.preprocess_container を同梱し、コード側は環境変数経由で予約領域のパスを参照。

TrainingJobのコンテナ例

TrainingJobのコンテナ内ではopt/ml/以下がSageMaker側の予約領域として利用されます。なので、具体的な処理に関係するコードやパラメータ類は他のディレクトリに(今回はsrc配下に)設置することになります。

また、入出力の内容はSagemakerからコンテナを実行する際に指定したS3パスでデータの取り込み、出力を行ってくれます。

AWSで規定されているコンテナの仕様

  • 予約ディレクトリ(Training)
    • 入力: /opt/ml/input
      • データ: /opt/ml/input/data/<channel_name>(例: train
      • 設定: /opt/ml/input/confighyperparameters.json など)
    • 成果物: /opt/ml/model(この配下が S3 に保存される)
    • 出力: /opt/ml/outputfailure にエラーログなど)
  • Entrypoint は exec 形式推奨
    • 例: ENTRYPOINT ["python", "/app/train.py"]
  • Spot + Checkpoint(任意)
    • checkpoint_local_pathcheckpoint_s3_uri を組み合わせ、再開可能な学習を設計

コンテナのディレクトリ構成

  • Estimator を用いて、イメージ URI・インスタンスタイプ・出力先・メトリクス・(任意で)Spot/Checkpoint を指定。
  • コンテナは /opt/ml/input/data/<channel> からデータを読み、/opt/ml/model に成果物を書き出す。
  • ログを規則的に出力し、MetricDefinitions の正規表現で SageMaker 側に収集させる。
/
├─ opt/
│  └─ ml/
│     ├─ input/
│     │  └─ data/
│     │     └─ train/                          # 入力(SageMaker channel: train)
│     ├─ model/                                # 成果物(SageMaker が S3 へ出力)
│     └─ output/                               # 失敗ログ(failure)など
├─ checkmarks/                                 # なし(参考)
├─ checkpoints/                                # チェックポイント(コードは /checkpoints を使用)
├─ src/
│  ├─ core/
│  ├─ jobs/
│  │  └─ training/
│  └─ training_job.py                          # ENTRYPOINT で実行
├─ ml_config/
├─ env/
│  ├─ .env
│  └─ .env.training_container
└─ requirements.txt

Dockerファイルの構成例

# pythonのバージョンは脆弱性から変えた方が良い(とりあえずサンプルなので)。
FROM python:3.11.12-slim

ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1

RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
 && rm -rf /var/lib/apt/lists/*

WORKDIR /opt/ml

# 依存ファイル
COPY ../../requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# ==== ml 配下の Python コード全体と設定をコピー ====
# job固有処理
COPY src/jobs/training /src/jobs/training
COPY src/training_job.py /src/training_job.py
# job共通
COPY src/core /src/core      
COPY ml_config /ml_config
COPY env/.env /env/.env
COPY env/.env.training_container /env/.env.training_container
# データ入出力
RUN mkdir -p ./input/data/train
RUN mkdir -p ./model
RUN mkdir -p /checkpoints


# 実行スクリプトの指定
WORKDIR /
ENTRYPOINT ["python", "/src/training_job.py"]

ポイント:

  • /opt/ml/input/data/train を入力、/opt/ml/model を成果物、/checkpoints をチェックポイントに使用。
  • ENTRYPOINT は exec 形式で training_job.py を実行。

SagemakerでのStepの定義例

ProcessingJobの定義例

パイプライン実行時のパラメータ(Processing)

パイプラインから前処理を実行する際の主なパラメータは以下です。

  • ProcessingInstanceCount(ParameterInteger)
    • 前処理ジョブのインスタンス数。
  • ProcessingInstanceType(ParameterString)
    • インスタンスタイプ(例: ml.m5.large)。
  • DataVolumeSize(ParameterInteger)
    • 入出力データを保存する一時ボリュームのサイズ(GB)。
  • InputData(ParameterString)
    • S3 の生データ入力 URI。
    • パイプラインでは ProcessingInput として /opt/ml/processing/input/data/raw_data にマウント。
  • PreprocessedData(ParameterString)
    • 前処理済みデータの出力先 S3 URI。
    • パイプラインでは ProcessingOutput として /opt/ml/processing/output/data/preprocess_data から収集。

その他:

  • image_uri${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/iris_pipeline/preprocess:latest を使用(アカウント/リージョンに依存)。
  • 日付名前空間(pipeline-execute-YYYY-MM-DD/)はデフォルト値に含まれているため、別日付で実行したい場合は InputData/PreprocessedDatapipeline.start(parameters={...}) で上書き可能です。

パイプラインパラメータの定義

パイプラインのパラメータは ParameterInteger/ParameterString で宣言し、Step 内から参照します。

def _params_definition(self) -> list[ParameterInteger | ParameterString]:
    self.processing_instance_count = ParameterInteger(
        name="ProcessingInstanceCount", default_value=1
    )
    self.processing_instance_type = ParameterString(
        name="ProcessingInstanceType", default_value="ml.m5.large"
    )
    self.training_instance_count = ParameterInteger(
        name="TrainingInstanceCount", default_value=1
    )
    self.training_instance_type = ParameterString(
        name="TrainingInstanceType", default_value="ml.m5.large"
    )
    self.model_approval_status = ParameterString(
        name="ModelApprovalStatus", default_value="PendingManualApproval"
    )
    self.input_data = ParameterString(
        name="InputData", default_value=self.s3_base_uri + "data/raw"
    )
    self.preprocessed_data = ParameterString(
        name="PreprocessedData", default_value=self.s3_base_uri + "data/preprocess"
    )
    self.training_checkpoint_model_data = ParameterString(
        name="TrainingCheckpointData", default_value=self.s3_base_uri + "train/checkpoint"
    )
    self.training_model_data = ParameterString(
        name="TrainingModelData", default_value=self.s3_base_uri + "train/model"
    )
    self.volume_size = ParameterInteger(name="DataVolumeSize", default_value=2)
    return ([
        self.processing_instance_count,
        self.processing_instance_type,
        self.training_instance_count,
        self.training_instance_type,
        self.model_approval_status,
        self.input_data,
        self.preprocessed_data,
        self.training_checkpoint_model_data,
        self.training_model_data,
        self.volume_size,
    ],)

ポイント:

  • Processing 用に最低限必要なのは ProcessingInstanceCount/Type, InputData, PreprocessedData, DataVolumeSize
  • 実行時に pipeline.start(parameters={...}) で上書き可能(Lambda 実装拡張で対応)。

Step 定義の詳細(ProcessingStep)

本リポジトリのパイプラインでは、前処理は ProcessingStep で定義しています。

# 前処理イメージ
account_id = sagemaker_session.boto_session.client("sts").get_caller_identity()["Account"]
preprocess_image = f"{account_id}.dkr.ecr.{region}.amazonaws.com/iris_pipeline/preprocess:latest"

# Processor の作成(独自イメージ)
script_processor = Processor(
    image_uri=preprocess_image,
    role=role,
    instance_count=self.processing_instance_count,
    instance_type=self.processing_instance_type,
    volume_size_in_gb=self.volume_size,
    sagemaker_session=sagemaker_session,
)

# Step 定義
step_preprocess = ProcessingStep(
    name=f"IrisPreprocessStep-{today_str}",
    processor=script_processor,
    inputs=[
        ProcessingInput(
            source=self.input_data,
            destination="/opt/ml/processing/input/data/raw_data",
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="preprocessed_data",
            source="/opt/ml/processing/output/data/preprocess_data",
            destination=self.preprocessed_data,
        )
    ],
)

ポイント解説:

  • Processor
    • 独自イメージ(ECR)の URI を指定。instance_count/typevolume_size_in_gb はパイプラインパラメータ(Parameter*)を参照。
    • role は当該ステップが実行時に用いる IAM ロール。
  • ProcessingStep
    • name: 実行日を含め(today_str)、履歴で区別しやすくしている。
    • inputs: source に S3 URI(ParameterString: InputData)、destination にコンテナの予約領域 /opt/ml/processing/input/...
    • outputs: output_name は後続参照名、source は出力元ディレクトリ、destination は S3 出力先(ParameterString: PreprocessedData)。
  • 依存関係
    • ここでは単独ステップ。後続の TrainingStep で本ステップの properties を参照することで自動的に依存が張られる(明示の depends_on は不要)。

Tips:

  • Processor の代わりに ScriptProcessor も利用可能(エントリスクリプトと引数を SDK 側で指定する形)。本構成は Dockerfile の ENTRYPOINT をそのまま利用するため Processor を選択しています。

TraingJobの定義例

パイプライン実行時のパラメータ(Processing)

パイプラインから前処理を実行する際の主なパラメータは以下です。

  • ProcessingInstanceCount(ParameterInteger)
    • 前処理ジョブのインスタンス数。
  • ProcessingInstanceType(ParameterString)
    • インスタンスタイプ(例: ml.m5.large)。
  • DataVolumeSize(ParameterInteger)
    • 入出力データを保存する一時ボリュームのサイズ(GB)。
  • InputData(ParameterString)
    • S3 の生データ入力 URI。
    • パイプラインでは ProcessingInput として /opt/ml/processing/input/data/raw_data にマウント。
  • PreprocessedData(ParameterString)
    • 前処理済みデータの出力先 S3 URI。
    • パイプラインでは ProcessingOutput として /opt/ml/processing/output/data/preprocess_data から収集。

その他:

  • image_uri${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/iris_pipeline/preprocess:latest を使用(アカウント/リージョンに依存)。
  • 日付名前空間(pipeline-execute-YYYY-MM-DD/)はデフォルト値に含まれているため、別日付で実行したい場合は InputData/PreprocessedDatapipeline.start(parameters={...}) で上書き可能です。

パイプラインパラメータの定義

Processing 記事と同様に、パラメータは Parameter* で宣言して Step/Estimator から参照します。

def _params_definition(self) -> list[ParameterInteger | ParameterString]:
    self.processing_instance_count = ParameterInteger(
        name="ProcessingInstanceCount", default_value=1
    )
    self.processing_instance_type = ParameterString(
        name="ProcessingInstanceType", default_value="ml.m5.large"
    )
    self.training_instance_count = ParameterInteger(
        name="TrainingInstanceCount", default_value=1
    )
    self.training_instance_type = ParameterString(
        name="TrainingInstanceType", default_value="ml.m5.large"
    )
    self.model_approval_status = ParameterString(
        name="ModelApprovalStatus", default_value="PendingManualApproval"
    )
    self.input_data = ParameterString(
        name="InputData", default_value=self.s3_base_uri + "data/raw"
    )
    self.preprocessed_data = ParameterString(
        name="PreprocessedData", default_value=self.s3_base_uri + "data/preprocess"
    )
    self.training_checkpoint_model_data = ParameterString(
        name="TrainingCheckpointData", default_value=self.s3_base_uri + "train/checkpoint"
    )
    self.training_model_data = ParameterString(
        name="TrainingModelData", default_value=self.s3_base_uri + "train/model"
    )
    self.volume_size = ParameterInteger(name="DataVolumeSize", default_value=2)
    return ([
        self.processing_instance_count,
        self.processing_instance_type,
        self.training_instance_count,
        self.training_instance_type,
        self.model_approval_status,
        self.input_data,
        self.preprocessed_data,
        self.training_checkpoint_model_data,
        self.training_model_data,
        self.volume_size,
    ],)

ポイント:

  • Training で直接参照するのは TrainingInstanceCount/Type, TrainingModelData, TrainingCheckpointData, DataVolumeSize
  • 入力 train は Processing の出力 PreprocessedData を参照してマウントされるため、両者のパラメータが連動します。
  • 実行時に個別の S3 プレフィックスへ切り替えたい場合は、TrainingModelData/TrainingCheckpointData のみ上書きしても良いですが、同一プレフィックスに統一したい場合は InputData/PreprocessedData も含めて揃えると一貫性が保てます。

Step 定義の詳細(ProcessingStep)

本リポジトリのパイプラインでは、前処理は ProcessingStep で定義しています。

# 前処理イメージ
account_id = sagemaker_session.boto_session.client("sts").get_caller_identity()["Account"]
preprocess_image = f"{account_id}.dkr.ecr.{region}.amazonaws.com/iris_pipeline/preprocess:latest"

# Processor の作成(独自イメージ)
script_processor = Processor(
    image_uri=preprocess_image,
    role=role,
    instance_count=self.processing_instance_count,
    instance_type=self.processing_instance_type,
    volume_size_in_gb=self.volume_size,
    sagemaker_session=sagemaker_session,
)

# Step 定義
step_preprocess = ProcessingStep(
    name=f"IrisPreprocessStep-{today_str}",
    processor=script_processor,
    inputs=[
        ProcessingInput(
            source=self.input_data,
            destination="/opt/ml/processing/input/data/raw_data",
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="preprocessed_data",
            source="/opt/ml/processing/output/data/preprocess_data",
            destination=self.preprocessed_data,
        )
    ],
)

ポイント解説:

  • Processor
    • 独自イメージ(ECR)の URI を指定。instance_count/typevolume_size_in_gb はパイプラインパラメータ(Parameter*)を参照。
    • role は当該ステップが実行時に用いる IAM ロール。
  • ProcessingStep
    • name: 実行日を含め(today_str)、履歴で区別しやすくしている。
    • inputs: source に S3 URI(ParameterString: InputData)、destination にコンテナの予約領域 /opt/ml/processing/input/...
    • outputs: output_name は後続参照名、source は出力元ディレクトリ、destination は S3 出力先(ParameterString: PreprocessedData)。
  • 依存関係
    • ここでは単独ステップ。後続の TrainingStep で本ステップの properties を参照することで自動的に依存が張られる(明示の depends_on は不要)。

Tips:

  • Processor の代わりに ScriptProcessor も利用可能(エントリスクリプトと引数を SDK 側で指定する形)。本構成は Dockerfile の ENTRYPOINT をそのまま利用するため Processor を選択しています。

pipelineの定義例

パイプラインの定義はparametersに定義したパラメータをリストで設定し、stepsも設定するStepの定義をリストで渡します。
あとは、定義したPipelineを登録し、Startで実行できます。


pipeline_name = "IrisPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters= [
        self.processing_instance_count,  # training jobのインスタンス数
        self.processing_instance_type,  # processing jobのインスタンスタイプ
        self.training_instance_count,  # training jobのインスタンス数
        self.training_instance_type,  # training jobのインスタンスタイプ
        self.model_approval_status,  # CI/CD用に学習済みモデルを登録するかのステータス
        self.input_data,  # 入力データのS3バケットのURI
        self.preprocessed_data,  # 前処理データのS3バケットのURI
        self.training_checkpoint_model_data,  # 学習時のチェックポイント用のS3バケットのURI
        self.training_model_data,  # 学習時のチェックポイント用のS3バケットのURI
        self.volume_size,  # 入出力データを保存するために使用されるストレージボリュームのサイズ (GB 単位)
    ],
    steps= [step_preprocess, step_training],
)

# Pipelineの登録
pipeline.upsert(role_arn=sagemaker_role)
# パイプラインの実行
exec = pipeline.start()

おわりに

以前の記事でSageMaker PythonSDKの記事も書きたいと言っていた内容を3年越しに記事にしてみました。
感想としては、コンテナを自作する方がコンテナの構成が自明で構築しやすいと思いました。(コンテナ作ろう。)

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?