はじめに
今回はAWSのSageMakerに関する利用方法を紹介についてです。
SageMakerの記事はStepFunctionを組み合わせてパイプラインを構築する記事を以前に投稿しています。
なので、今回はSageMaker PythonSDKを利用してパイプラインを構築した内容を紹介したいと思います。
以前の記事はこちら→
AWS StepFunctions を利用して機械学習パイプラインをつくってみる。
(以前の記事の終わりに、SageMaker PythonSDKの記事も書きたいと言っておきながら3年経ってました笑)
SageMakerについて
今回と前回の主な違いは前処理、学習の実行コンテナをカスタムコンテナをもちいて構築している点が依然と大きく異なります。
ここではカスタムコンテナの構築内容を主に説明します。
SageMakerのパイプライン構築方法について
今回利用するSagemakerが提供している機能は2種類あり、学習Jobを管理するTraningJob、前処理や後処理などのJobを管理するProcessingJobがあります。
- TraningJob
- 用途:学習処理を管理するための機能
- 特徴:
- 長時間の学習を想定した機能が提供されている
- GPUのスポットインスタンスを確保するためのパラメータが存在する
- Chekpoint機能があり、学習中断後から復帰するための機能が提供されている
- 長時間の学習を想定した機能が提供されている
- ProcessingJob
- 用途:前処理などを管理するための機能
- 特徴:
- コンテナの構成などTraningJobに比べ制約がなく、わかりやすい。
- 長時間の稼働の処理などは想定されていない
Sagemakerの機能はコンテナ上でコードが実行されるため、どこまでユーザーが作りこむかの選択肢があります。SageMakerのパイプライン構築には大きく分けて以下の3つの選択肢があります。箇条書きの下の項目ほどコード量が増えますが、カスタマイズの自由度が増します。
- 組み込みアルゴリズム :AWSが提供しているアルゴリズムを使用する
- 独自スクリプトの持ち込み :モデルの構築・学習スクリプトを自作する
- 独自コンテナの持ち込み :モデルの構築・学習スクリプトに加えて、コンテナも自作する
紹介するのは「今回は独自コンテナ持ち込み」を用いてのパイプライン構築になります。
パイプラインの構成
構築したコンテナの紹介
今回構築するパイプラインは前処理と学習の2ステップのパイプラインになります。以下の図は簡易はAWSの構成になっており、パイプラインの各Jobは処理のコンテナが格納されているECRとデータの永続化先としてS3と連携する仕組みになっています。
今から紹介するコンテナをECRにPushし、PipelineのStepを定義する際に使用するコンテナURIを指定することで利用ができます。
ProcessingJobのコンテナ例
AWSで規定されているコンテナの仕様
ProcessingJobのコンテナ内ではopt/ml/processing
以下がSageMaker側の予約領域として利用されます。なので、具体的な処理に関係するコードやパラメータ類は他のディレクトリに(今回はsrc配下に)設置することになります。
また、入出力の内容はSagemakerからコンテナを実行する際に指定したS3パスでデータの取り込み、出力を行ってくれます。
- 予約ディレクトリ(Processing)
- 入力:
/opt/ml/processing/input
- 出力:
/opt/ml/processing/output
- SDK から
ProcessingInput
/ProcessingOutput
を指定すると、SageMaker が上記パスへ入出力をマウント・収集します。
- 入力:
- Entrypoint は exec 形式推奨
- 例:
ENTRYPOINT ["python", "/app/processing.py"]
- SIGTERM/終了コードの取り扱いが正しくなり、ジョブの成功/失敗判定が安定します。
- 例:
- 成功/失敗判定
- プロセスの終了コードが 0 なら成功、それ以外は失敗として扱われます。
コンテナのディレクトリ構成
- コンテナはアプリコード(前処理スクリプト)と依存関係を含み、ENTRYPOINT でジョブを起動。
- 入力は
/opt/ml/processing/input/...
、出力は/opt/ml/processing/output/...
に書き出す。 - 設定は環境変数や YAML を用いると、環境差分(ローカル/本番・ボリュームマウント)を吸収しやすい。
/
├─ opt/
│ └─ ml/
│ └─ processing/
│ ├─ input/
│ │ └─ data/
│ │ └─ raw_data/ # 入力(SageMaker がマウント)
│ └─ output/
│ └─ data/
│ └─ preprocess_data/ # 出力(SageMaker が収集)
├─ src/
│ ├─ core/
│ │ ├─ configs/
│ │ ├─ data/
│ │ └─ model/
│ ├─ jobs/
│ │ └─ preprocess/
│ └─ preprocess_job.py # ENTRYPOINT で実行
├─ ml_config/
│ ├─ dataset_config.yaml
│ └─ training_config.yaml
├─ env/
│ ├─ .env
│ └─ .env.preprocess_container
└─ requirements.txt
Dockerファイルの構成例
# pythonのバージョンは脆弱性から変えた方が良い(とりあえずサンプルなので)。
FROM python:3.11.12-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /opt/ml/processing
# 依存ファイル
COPY ../../requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# ==== ml 配下の Python コード全体と設定をコピー ====
# job固有処理
COPY src/jobs/preprocess /src/jobs/preprocess
COPY src/preprocess_job.py /src/preprocess_job.py
# job共通
COPY src/core /src/core
COPY ml_config /ml_config
COPY env/.env /env/.env
COPY env/.env.preprocess_container /env/.env.preprocess_container
# データ入出力
RUN mkdir -p ./input/data/raw_data
RUN mkdir -p ./input/data/preprocess_data
# 実行スクリプトの指定
WORKDIR /
ENTRYPOINT ["python", "/src/preprocess_job.py"]
ポイント:
- ENTRYPOINT は exec 形式で
preprocess_job.py
を起動。 -
/opt/ml/processing
を作業ディレクトリにし、Processing の予約領域配下に入出力ディレクトリを準備。 -
.env.preprocess_container
を同梱し、コード側は環境変数経由で予約領域のパスを参照。
TrainingJobのコンテナ例
TrainingJobのコンテナ内ではopt/ml/
以下がSageMaker側の予約領域として利用されます。なので、具体的な処理に関係するコードやパラメータ類は他のディレクトリに(今回はsrc配下に)設置することになります。
また、入出力の内容はSagemakerからコンテナを実行する際に指定したS3パスでデータの取り込み、出力を行ってくれます。
AWSで規定されているコンテナの仕様
- 予約ディレクトリ(Training)
- 入力:
/opt/ml/input
- データ:
/opt/ml/input/data/<channel_name>
(例:train
) - 設定:
/opt/ml/input/config
(hyperparameters.json
など)
- データ:
- 成果物:
/opt/ml/model
(この配下が S3 に保存される) - 出力:
/opt/ml/output
(failure
にエラーログなど)
- 入力:
- Entrypoint は exec 形式推奨
- 例:
ENTRYPOINT ["python", "/app/train.py"]
- 例:
- Spot + Checkpoint(任意)
-
checkpoint_local_path
とcheckpoint_s3_uri
を組み合わせ、再開可能な学習を設計
-
コンテナのディレクトリ構成
- Estimator を用いて、イメージ URI・インスタンスタイプ・出力先・メトリクス・(任意で)Spot/Checkpoint を指定。
- コンテナは
/opt/ml/input/data/<channel>
からデータを読み、/opt/ml/model
に成果物を書き出す。 - ログを規則的に出力し、MetricDefinitions の正規表現で SageMaker 側に収集させる。
/
├─ opt/
│ └─ ml/
│ ├─ input/
│ │ └─ data/
│ │ └─ train/ # 入力(SageMaker channel: train)
│ ├─ model/ # 成果物(SageMaker が S3 へ出力)
│ └─ output/ # 失敗ログ(failure)など
├─ checkmarks/ # なし(参考)
├─ checkpoints/ # チェックポイント(コードは /checkpoints を使用)
├─ src/
│ ├─ core/
│ ├─ jobs/
│ │ └─ training/
│ └─ training_job.py # ENTRYPOINT で実行
├─ ml_config/
├─ env/
│ ├─ .env
│ └─ .env.training_container
└─ requirements.txt
Dockerファイルの構成例
# pythonのバージョンは脆弱性から変えた方が良い(とりあえずサンプルなので)。
FROM python:3.11.12-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /opt/ml
# 依存ファイル
COPY ../../requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# ==== ml 配下の Python コード全体と設定をコピー ====
# job固有処理
COPY src/jobs/training /src/jobs/training
COPY src/training_job.py /src/training_job.py
# job共通
COPY src/core /src/core
COPY ml_config /ml_config
COPY env/.env /env/.env
COPY env/.env.training_container /env/.env.training_container
# データ入出力
RUN mkdir -p ./input/data/train
RUN mkdir -p ./model
RUN mkdir -p /checkpoints
# 実行スクリプトの指定
WORKDIR /
ENTRYPOINT ["python", "/src/training_job.py"]
ポイント:
-
/opt/ml/input/data/train
を入力、/opt/ml/model
を成果物、/checkpoints
をチェックポイントに使用。 - ENTRYPOINT は exec 形式で
training_job.py
を実行。
SagemakerでのStepの定義例
ProcessingJobの定義例
パイプライン実行時のパラメータ(Processing)
パイプラインから前処理を実行する際の主なパラメータは以下です。
-
ProcessingInstanceCount
(ParameterInteger)- 前処理ジョブのインスタンス数。
-
ProcessingInstanceType
(ParameterString)- インスタンスタイプ(例:
ml.m5.large
)。
- インスタンスタイプ(例:
-
DataVolumeSize
(ParameterInteger)- 入出力データを保存する一時ボリュームのサイズ(GB)。
-
InputData
(ParameterString)- S3 の生データ入力 URI。
- パイプラインでは
ProcessingInput
として/opt/ml/processing/input/data/raw_data
にマウント。
-
PreprocessedData
(ParameterString)- 前処理済みデータの出力先 S3 URI。
- パイプラインでは
ProcessingOutput
として/opt/ml/processing/output/data/preprocess_data
から収集。
その他:
-
image_uri
は${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/iris_pipeline/preprocess:latest
を使用(アカウント/リージョンに依存)。 - 日付名前空間(
pipeline-execute-YYYY-MM-DD/
)はデフォルト値に含まれているため、別日付で実行したい場合はInputData
/PreprocessedData
をpipeline.start(parameters={...})
で上書き可能です。
パイプラインパラメータの定義
パイプラインのパラメータは ParameterInteger
/ParameterString
で宣言し、Step 内から参照します。
def _params_definition(self) -> list[ParameterInteger | ParameterString]:
self.processing_instance_count = ParameterInteger(
name="ProcessingInstanceCount", default_value=1
)
self.processing_instance_type = ParameterString(
name="ProcessingInstanceType", default_value="ml.m5.large"
)
self.training_instance_count = ParameterInteger(
name="TrainingInstanceCount", default_value=1
)
self.training_instance_type = ParameterString(
name="TrainingInstanceType", default_value="ml.m5.large"
)
self.model_approval_status = ParameterString(
name="ModelApprovalStatus", default_value="PendingManualApproval"
)
self.input_data = ParameterString(
name="InputData", default_value=self.s3_base_uri + "data/raw"
)
self.preprocessed_data = ParameterString(
name="PreprocessedData", default_value=self.s3_base_uri + "data/preprocess"
)
self.training_checkpoint_model_data = ParameterString(
name="TrainingCheckpointData", default_value=self.s3_base_uri + "train/checkpoint"
)
self.training_model_data = ParameterString(
name="TrainingModelData", default_value=self.s3_base_uri + "train/model"
)
self.volume_size = ParameterInteger(name="DataVolumeSize", default_value=2)
return ([
self.processing_instance_count,
self.processing_instance_type,
self.training_instance_count,
self.training_instance_type,
self.model_approval_status,
self.input_data,
self.preprocessed_data,
self.training_checkpoint_model_data,
self.training_model_data,
self.volume_size,
],)
ポイント:
- Processing 用に最低限必要なのは
ProcessingInstanceCount/Type
,InputData
,PreprocessedData
,DataVolumeSize
。 - 実行時に
pipeline.start(parameters={...})
で上書き可能(Lambda 実装拡張で対応)。
Step 定義の詳細(ProcessingStep)
本リポジトリのパイプラインでは、前処理は ProcessingStep
で定義しています。
# 前処理イメージ
account_id = sagemaker_session.boto_session.client("sts").get_caller_identity()["Account"]
preprocess_image = f"{account_id}.dkr.ecr.{region}.amazonaws.com/iris_pipeline/preprocess:latest"
# Processor の作成(独自イメージ)
script_processor = Processor(
image_uri=preprocess_image,
role=role,
instance_count=self.processing_instance_count,
instance_type=self.processing_instance_type,
volume_size_in_gb=self.volume_size,
sagemaker_session=sagemaker_session,
)
# Step 定義
step_preprocess = ProcessingStep(
name=f"IrisPreprocessStep-{today_str}",
processor=script_processor,
inputs=[
ProcessingInput(
source=self.input_data,
destination="/opt/ml/processing/input/data/raw_data",
)
],
outputs=[
ProcessingOutput(
output_name="preprocessed_data",
source="/opt/ml/processing/output/data/preprocess_data",
destination=self.preprocessed_data,
)
],
)
ポイント解説:
-
Processor
- 独自イメージ(ECR)の URI を指定。
instance_count/type
とvolume_size_in_gb
はパイプラインパラメータ(Parameter*
)を参照。 -
role
は当該ステップが実行時に用いる IAM ロール。
- 独自イメージ(ECR)の URI を指定。
-
ProcessingStep
-
name
: 実行日を含め(today_str
)、履歴で区別しやすくしている。 -
inputs
:source
に S3 URI(ParameterString:InputData
)、destination
にコンテナの予約領域/opt/ml/processing/input/...
。 -
outputs
:output_name
は後続参照名、source
は出力元ディレクトリ、destination
は S3 出力先(ParameterString:PreprocessedData
)。
-
- 依存関係
- ここでは単独ステップ。後続の
TrainingStep
で本ステップのproperties
を参照することで自動的に依存が張られる(明示のdepends_on
は不要)。
- ここでは単独ステップ。後続の
Tips:
-
Processor
の代わりにScriptProcessor
も利用可能(エントリスクリプトと引数を SDK 側で指定する形)。本構成は Dockerfile の ENTRYPOINT をそのまま利用するためProcessor
を選択しています。
TraingJobの定義例
パイプライン実行時のパラメータ(Processing)
パイプラインから前処理を実行する際の主なパラメータは以下です。
-
ProcessingInstanceCount
(ParameterInteger)- 前処理ジョブのインスタンス数。
-
ProcessingInstanceType
(ParameterString)- インスタンスタイプ(例:
ml.m5.large
)。
- インスタンスタイプ(例:
-
DataVolumeSize
(ParameterInteger)- 入出力データを保存する一時ボリュームのサイズ(GB)。
-
InputData
(ParameterString)- S3 の生データ入力 URI。
- パイプラインでは
ProcessingInput
として/opt/ml/processing/input/data/raw_data
にマウント。
-
PreprocessedData
(ParameterString)- 前処理済みデータの出力先 S3 URI。
- パイプラインでは
ProcessingOutput
として/opt/ml/processing/output/data/preprocess_data
から収集。
その他:
-
image_uri
は${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/iris_pipeline/preprocess:latest
を使用(アカウント/リージョンに依存)。 - 日付名前空間(
pipeline-execute-YYYY-MM-DD/
)はデフォルト値に含まれているため、別日付で実行したい場合はInputData
/PreprocessedData
をpipeline.start(parameters={...})
で上書き可能です。
パイプラインパラメータの定義
Processing 記事と同様に、パラメータは Parameter*
で宣言して Step/Estimator から参照します。
def _params_definition(self) -> list[ParameterInteger | ParameterString]:
self.processing_instance_count = ParameterInteger(
name="ProcessingInstanceCount", default_value=1
)
self.processing_instance_type = ParameterString(
name="ProcessingInstanceType", default_value="ml.m5.large"
)
self.training_instance_count = ParameterInteger(
name="TrainingInstanceCount", default_value=1
)
self.training_instance_type = ParameterString(
name="TrainingInstanceType", default_value="ml.m5.large"
)
self.model_approval_status = ParameterString(
name="ModelApprovalStatus", default_value="PendingManualApproval"
)
self.input_data = ParameterString(
name="InputData", default_value=self.s3_base_uri + "data/raw"
)
self.preprocessed_data = ParameterString(
name="PreprocessedData", default_value=self.s3_base_uri + "data/preprocess"
)
self.training_checkpoint_model_data = ParameterString(
name="TrainingCheckpointData", default_value=self.s3_base_uri + "train/checkpoint"
)
self.training_model_data = ParameterString(
name="TrainingModelData", default_value=self.s3_base_uri + "train/model"
)
self.volume_size = ParameterInteger(name="DataVolumeSize", default_value=2)
return ([
self.processing_instance_count,
self.processing_instance_type,
self.training_instance_count,
self.training_instance_type,
self.model_approval_status,
self.input_data,
self.preprocessed_data,
self.training_checkpoint_model_data,
self.training_model_data,
self.volume_size,
],)
ポイント:
- Training で直接参照するのは
TrainingInstanceCount/Type
,TrainingModelData
,TrainingCheckpointData
,DataVolumeSize
。 - 入力
train
は Processing の出力PreprocessedData
を参照してマウントされるため、両者のパラメータが連動します。 - 実行時に個別の S3 プレフィックスへ切り替えたい場合は、
TrainingModelData
/TrainingCheckpointData
のみ上書きしても良いですが、同一プレフィックスに統一したい場合はInputData
/PreprocessedData
も含めて揃えると一貫性が保てます。
Step 定義の詳細(ProcessingStep)
本リポジトリのパイプラインでは、前処理は ProcessingStep
で定義しています。
# 前処理イメージ
account_id = sagemaker_session.boto_session.client("sts").get_caller_identity()["Account"]
preprocess_image = f"{account_id}.dkr.ecr.{region}.amazonaws.com/iris_pipeline/preprocess:latest"
# Processor の作成(独自イメージ)
script_processor = Processor(
image_uri=preprocess_image,
role=role,
instance_count=self.processing_instance_count,
instance_type=self.processing_instance_type,
volume_size_in_gb=self.volume_size,
sagemaker_session=sagemaker_session,
)
# Step 定義
step_preprocess = ProcessingStep(
name=f"IrisPreprocessStep-{today_str}",
processor=script_processor,
inputs=[
ProcessingInput(
source=self.input_data,
destination="/opt/ml/processing/input/data/raw_data",
)
],
outputs=[
ProcessingOutput(
output_name="preprocessed_data",
source="/opt/ml/processing/output/data/preprocess_data",
destination=self.preprocessed_data,
)
],
)
ポイント解説:
-
Processor
- 独自イメージ(ECR)の URI を指定。
instance_count/type
とvolume_size_in_gb
はパイプラインパラメータ(Parameter*
)を参照。 -
role
は当該ステップが実行時に用いる IAM ロール。
- 独自イメージ(ECR)の URI を指定。
-
ProcessingStep
-
name
: 実行日を含め(today_str
)、履歴で区別しやすくしている。 -
inputs
:source
に S3 URI(ParameterString:InputData
)、destination
にコンテナの予約領域/opt/ml/processing/input/...
。 -
outputs
:output_name
は後続参照名、source
は出力元ディレクトリ、destination
は S3 出力先(ParameterString:PreprocessedData
)。
-
- 依存関係
- ここでは単独ステップ。後続の
TrainingStep
で本ステップのproperties
を参照することで自動的に依存が張られる(明示のdepends_on
は不要)。
- ここでは単独ステップ。後続の
Tips:
-
Processor
の代わりにScriptProcessor
も利用可能(エントリスクリプトと引数を SDK 側で指定する形)。本構成は Dockerfile の ENTRYPOINT をそのまま利用するためProcessor
を選択しています。
pipelineの定義例
パイプラインの定義はparameters
に定義したパラメータをリストで設定し、steps
も設定するStepの定義をリストで渡します。
あとは、定義したPipelineを登録し、Startで実行できます。
pipeline_name = "IrisPipeline"
pipeline = Pipeline(
name=pipeline_name,
parameters= [
self.processing_instance_count, # training jobのインスタンス数
self.processing_instance_type, # processing jobのインスタンスタイプ
self.training_instance_count, # training jobのインスタンス数
self.training_instance_type, # training jobのインスタンスタイプ
self.model_approval_status, # CI/CD用に学習済みモデルを登録するかのステータス
self.input_data, # 入力データのS3バケットのURI
self.preprocessed_data, # 前処理データのS3バケットのURI
self.training_checkpoint_model_data, # 学習時のチェックポイント用のS3バケットのURI
self.training_model_data, # 学習時のチェックポイント用のS3バケットのURI
self.volume_size, # 入出力データを保存するために使用されるストレージボリュームのサイズ (GB 単位)
],
steps= [step_preprocess, step_training],
)
# Pipelineの登録
pipeline.upsert(role_arn=sagemaker_role)
# パイプラインの実行
exec = pipeline.start()
おわりに
以前の記事でSageMaker PythonSDKの記事も書きたいと言っていた内容を3年越しに記事にしてみました。
感想としては、コンテナを自作する方がコンテナの構成が自明で構築しやすいと思いました。(コンテナ作ろう。)