今年は以前から興味のあったMLOps関係の業務に携わることができました。近年この分野は盛り上がりを見せており、様々なサービス・OSSが登場しています。AWS re:Invent2020においても、新たにAmazon SageMaker Pipelinesが発表されました。ここでは、Amazon SageMaker Pipelinesのサンプルコードを通してその使い方をご紹介します。
AWSにおけるパイプライン
機械学習パイプラインとは、データ取得から学習、評価と行った一連のワークフローを管理するシステムです。パイプラインによってワークフローを定義・実行・評価を自動的に行うことで、日々取得されるデータに対する継続的にモデルの学習や、データや学習のトレーサビリティを担保しモデル構築の再現を可能にするなどといった利点が得られます。
AWSにおけるパイプラインとしては、これまでStep Functions(Data Science SDK)やAmazon Managed Workflows for Apache Airflow、Kubeflow(Amazon EKS Workshopで構築方法を紹介)などが提供されてきました。今回Amazon SageMaker Pipelinesが提供開始されたことで、SageMaker Studioへの統合および他の機能との連携強化などにより、開発効率の向上が期待できます。
準備(Optional)
Amazon SageMaker Pipelinesは単独でノートブックインスタンスやローカルPCなどから実行することができますが、SageMaker Studioを使用することで簡単にワークフローを可視化することができるため、ここではまずSageMaker Studioを準備します。
Amazon SageMakerのページから以下の操作でSageMaker Studioを起動します。
- 左のリストの中から[Amazon SageMaker Studio]を選択
- [Get started]の中の[Quick Start]を選択
- 適切な[User name]とAmazonSageMakerFullAccessポリシーのついたロールを作成or選択してSubmit
- VPCとSubnetを選択してSave and Create
数分するとセットアップが完了しますので、作成したユーザの右側の[Open Studio]をクリックすると、Jupyter Severがセットアップ(ここも数分かかります)され以下の様にJupyter Notebookのようなインタフェースが開きます。本記事では以下の1-3を使用します。
- Git Clone:SageMaker PipelinesのサンプルコードをCloneします。
- リソースの確認:起動しているカーネルやインスタンスを確認、停止できます。インスタンスが稼働したままだと課金されてしまいますので、作業終了後にインスタンスが止まっているか確認するといいでしょう。
- 結果の確認:Amazon SageMakerのコンポーネントを作成・確認することができます。実行したパイプラインもここで確認することができます。
Amazon SageMaker Pipelinesの構成
パイプラインを試す前に構成を見ていきます。Amazon SageMaker PipelinesはName、Parameter、Stepからなります。
Name
パイプラインの名前で、アカウント・リージョンの中で一意である必要があります
Parameters
パイプラインへの入力となるパラメータで、以下の3つの型があります。
- ParameterString - represents a str Python type
- ParameterInteger - represents an int Python type
-
ParameterFloat - represents a float Python type
各パラメータは以下のように定義します。
<parameter> = <parameter_type>(
name="<parameter_name>",
default_value=<default_value>
)
### Steps
Stepはそれぞれパイプラインの振る舞いを定義します。これらのStepを組み合わせることによりDAGのJSONが生成されパイプラインが定義されます。
- Processing:SageMaker Processingを使用して、特徴量抽出やモデルの評価などを行います。Processor、処理スクリプト、入出力、処理に使用するパラメータが必要になります。実行後、入出力などのパラメータはpropertiesとして取得でき、次のステップの入力などに使用できます。
- Training:Training Jobを使用して学習を行います。Estimatorと学習、テストのデータセットが必要です。実行後、モデル出力パスや学習パラメータなどはpropertiesとして取得でき、次のステップの入力などに使用できます。
- Condition:条件分岐のステップです。条件のリストと、TrueおよびFalseの際に実行するステップをそれぞれ設定します。
- BatchTransform:Batch Transform Jobを使用して、データセット全体にバッチ推論を行います。Transformerと入力データが必要です。実行後、入出力などのパラメータはpropertiesとして取得でき、次のステップの入力などに使用できます。
- RegisterModel:モデルグループに学習モデルを登録します。モデルグループに登録することで、モデルのバージョン管理やCI/CDが可能になります。
- CreateModel:Batch Transformationで使用するSageMaker Modelを作成します。学習ステップの出力であるArtifactを使用します。
上記を使用してパイプラインを定義した例が以下になります。この定義やpropertiesによりSageMaker Pipelinesは各ステップの依存関係を理解し、実行していくことになります。
from sagemaker.workflow.pipeline import Pipeline
pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
name=pipeline_name,
parameters=[
processing_instance_type,
processing_instance_count,
training_instance_type,
model_approval_status,
input_data,
batch_data,
],
steps=[step_process, step_train, step_eval, step_cond],
)
Amazon SageMaker Pipelinesを試す
実行するパイプライン
今回はAWSから提供されているサンプルコードを試してみます。このサンプルコードでは以下のように、特徴量生成・学習・モデル評価、性能が満足であれば推論用のモデル生成・モデル登録、バッチ変換を行うステップを定義・実行します。
データセット
データセットはUCI Machine Learning Abalone Datasetを使用し、アワビの形状などから個々の年齢を推定する問題設定に対して、XGBoostアルゴリズムで回帰を行うという流れになります。
ちなみに、アワビの貝殻の表面には木の年輪のような「輪紋」があり、その長さを調べることで年齢を知ることができるそうです。詳しくはこちらなどをご覧ください。
パイプラインの作成前にこのデータセットとバッチ変換用のデータセットをS3にアップロードしておきます。
まずアップロードのためにリージョンやバケツ名などを設定します。
import boto3
import sagemaker
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"AbaloneModelPackageGroupName"
データセットをいったんローカルにダウンロードしてからS3へアップロードします。
!mkdir -p data
local_path = "data/abalone-dataset.csv"
s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
"dataset/abalone-dataset.csv",
local_path
)
base_uri = f"s3://{default_bucket}/abalone"
input_data_uri = sagemaker.s3.S3Uploader.upload(
local_path=local_path,
desired_s3_uri=base_uri,
)
print(input_data_uri)
バッチ変換用のデータセットも同様にS3に配置します。
local_path = "data/abalone-dataset-batch"
s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
"dataset/abalone-dataset-batch",
local_path
)
base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
local_path=local_path,
desired_s3_uri=base_uri,
)
print(batch_data_uri)
パイプラインパラメータの準備
前処理や学習ジョブなどのインスタンスタイプや、データパスなどを定義し、パイプラインへの入力とします。このようにしておくことで、同じパイプラインをデータを替えて学習することができます。
今回定義するパラメータは以下の通りです。
- processing_instance_type - processing jobのインスタンスタイプ
- processing_instance_count - processing jobのインスタンス数
- training_instance_type - training jobのインスタンスタイプ
- model_approval_status - CI/CD用に学習済みモデルを登録するかのステータス
- input_data - 入力データのS3バケツURI
- batch_data - バッチデータのS3バケツURI
パラメータ定義の文法に従い、以下のように設定します。
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
)
processing_instance_count = ParameterInteger(
name="ProcessingInstanceCount",
default_value=1
)
processing_instance_type = ParameterString(
name="ProcessingInstanceType",
default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
name="TrainingInstanceType",
default_value="ml.m5.xlarge"
)
model_approval_status = ParameterString(
name="ModelApprovalStatus",
default_value="PendingManualApproval"
)
input_data = ParameterString(
name="InputData",
default_value=input_data_uri,
)
batch_data = ParameterString(
name="BatchData",
default_value=batch_data_uri,
)
特徴量生成
最初のステップとして、Processing Stepを使用してデータの取得、変数の標準化、カテゴリ変数のOne hot encodingを行ったのち、学習と検証、評価用にデータセットをそれぞれ分けます。
Processing Stepには処理スクリプト、Processor、入出力が必要になりますので、それぞれ準備していきます。
処理スクリプト
ローカルにabaloneディレクトリを作成し、その上に処理スクリプトを作成します。
!mkdir -p abalone
処理の流れとしては、データを読み込んだ後scikit-learnを使用して、数値データは欠損値の補間および標準化、カテゴリ変数は同様に欠損値補間の後One-hot encodingでテキストを数値に変換しています。最後にデータセットをランダムにtrain, validation, testに分け、それぞれS3に格納していきます。Processing Stepの設定でS3バケツと実行時のディレクトリを紐づけるため、この処理ではS3バケツを意識する必要はありません。
%%writefile abalone/preprocessing.py
import argparse
import os
import requests
import tempfile
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
"sex",
"length",
"diameter",
"height",
"whole_weight",
"shucked_weight",
"viscera_weight",
"shell_weight",
]
label_column = "rings"
feature_columns_dtype = {
"sex": str,
"length": np.float64,
"diameter": np.float64,
"height": np.float64,
"whole_weight": np.float64,
"shucked_weight": np.float64,
"viscera_weight": np.float64,
"shell_weight": np.float64
}
label_column_dtype = {"rings": np.float64}
def merge_two_dicts(x, y):
z = x.copy()
z.update(y)
return z
if __name__ == "__main__":
base_dir = "/opt/ml/processing"
df = pd.read_csv(
f"{base_dir}/input/abalone-dataset.csv",
header=None,
names=feature_columns_names + [label_column],
dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)
)
numeric_features = list(feature_columns_names)
numeric_features.remove("sex")
numeric_transformer = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler())
]
)
categorical_features = ["sex"]
categorical_transformer = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
("onehot", OneHotEncoder(handle_unknown="ignore"))
]
)
preprocess = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features)
]
)
y = df.pop("rings")
X_pre = preprocess.fit_transform(df)
y_pre = y.to_numpy().reshape(len(y), 1)
X = np.concatenate((y_pre, X_pre), axis=1)
np.random.shuffle(X)
train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
次にProcessorの実装です。ここではSKLearnProcessorが使われています。
from sagemaker.sklearn.processing import SKLearnProcessor
framework_version = "0.23-1"
sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
base_job_name="sklearn-abalone-process",
role=role,
)
SageMaker Processingを単独で実行する際にはrunによって入出力やスクリプトを指定すると思いますが、SageMaker Pipelinesの場合には以下のようにProcessingStepを定義することにより、パイプライン実行時にこのStepが実行されるようにします。
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
step_process = ProcessingStep(
name="AbaloneProcess",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
],
code="abalone/preprocessing.py",
)
学習ステップ
学習ステップでは、Amazon SageMakerで提供されるbuilt-in アルゴリズムのうちXGBoostを使用して回帰モデルを構築します。通常のトレーニングジョブを走らせるのと同様にEstimatorを定義し、それを実行するためにTrainingStepを書いていきます。
from sagemaker.estimator import Estimator
model_path = f"s3://{default_bucket}/AbaloneTrain"
image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region=region,
version="1.0-1",
py_version="py3",
instance_type=training_instance_type,
)
xgb_train = Estimator(
image_uri=image_uri,
instance_type=training_instance_type,
instance_count=1,
output_path=model_path,
role=role,
)
xgb_train.set_hyperparameters(
objective="reg:linear",
num_round=50,
max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.7,
silent=0
)
続いてTrainingStepを実装します。こちらもfitの代わりにTrainingStepを使用するイメージかと思います。TrainingInputとして、ProcessingStepで作成したstep_process
のproperties
からS3の出力パスを使用しています。このようにすることで、ProcessingStepとTrainingStepの依存関係が認識され間違いなくデータがやりとりできます。
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
name="AbaloneTrain",
estimator=xgb_train,
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv"
),
"validation": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"validation"
].S3Output.S3Uri,
content_type="text/csv"
)
},
)
学習済みモデルの評価
TrainingStepで学習したモデルの評価を行います。このStepにはProcessingStepが使用されていますので、特徴量抽出と同様に処理スクリプト・Processor・ProcessingStepを実装していきます。
処理内容はまず学習ステップで生成されたモデルと、特徴量生成ステップで生成されたtestデータを読み込みます。データに対して推論結果と正解値を付き合わせ2乗平均誤差と誤差の標準偏差をJSON形式で出力します。
%%writefile abalone/evaluation.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
from sklearn.metrics import mean_squared_error
if __name__ == "__main__":
model_path = f"/opt/ml/processing/model/model.tar.gz"
with tarfile.open(model_path) as tar:
tar.extractall(path=".")
model = pickle.load(open("xgboost-model", "rb"))
test_path = "/opt/ml/processing/test/test.csv"
df = pd.read_csv(test_path, header=None)
y_test = df.iloc[:, 0].to_numpy()
df.drop(df.columns[0], axis=1, inplace=True)
X_test = xgboost.DMatrix(df.values)
predictions = model.predict(X_test)
mse = mean_squared_error(y_test, predictions)
std = np.std(y_test - predictions)
report_dict = {
"regression_metrics": {
"mse": {
"value": mse,
"standard_deviation": std
},
},
}
output_dir = "/opt/ml/processing/evaluation"
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
evaluation_path = f"{output_dir}/evaluation.json"
with open(evaluation_path, "w") as f:
f.write(json.dumps(report_dict))
この処理は推論にXGBoostを使用しますので、ScriptProcessorで学習ステップと同じコンテナを使用しています。
from sagemaker.processing import ScriptProcessor
script_eval = ScriptProcessor(
image_uri=image_uri,
command=["python3"],
instance_type=processing_instance_type,
instance_count=1,
base_job_name="script-abalone-eval",
role=role,
)
最後にProcessingStepです。特徴量のステップと同じように書いていきますが、出力となるJSONパスを後段のStepに渡すため最初にPropertyFiel
を定義しています。また、先の処理と紐付けをするためにstep_train
およびstep_process
のproperties
を使用してS3バケツのURIを取得していることに注意してください。
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
name="EvaluationReport",
output_name="evaluation",
path="evaluation.json"
)
step_eval = ProcessingStep(
name="AbaloneEval",
processor=script_eval,
inputs=[
ProcessingInput(
source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
destination="/opt/ml/processing/model"
),
ProcessingInput(
source=step_process.properties.ProcessingOutputConfig.Outputs[
"test"
].S3Output.S3Uri,
destination="/opt/ml/processing/test"
)
],
outputs=[
ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
],
code="abalone/evaluation.py",
property_files=[evaluation_report],
)
モデル生成
順番的には条件分岐のステップですが、ConditionStepの実装において後段Stepのインスタンスが必要になりますので、先にこれらを定義しておきます。
ここではバッチ推論で使用する学習モデルに向けて、以下のように学習ステップで構築したモデルをSageMaker Modelとして実装します。
from sagemaker.model import Model
model = Model(
image_uri=image_uri,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=sagemaker_session,
role=role,
)
instance_type と accelerator_type をCreateModelStepの入力として定義し、CreateModelStepを実装します。
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep
inputs = CreateModelInput(
instance_type="ml.m5.large",
accelerator_type="ml.eia1.medium",
)
step_create_model = CreateModelStep(
name="AbaloneCreateModel",
model=model,
inputs=inputs,
)
バッチ変換
モデル生成ステップで生成したSageMaker Modelを使ってバッチ用データセットを変換します。SageMakerのBatch Transformation Jobを使用しますのでTransformerを先に実装し、バッチ用データセットと共にTransformationStepを実装します。
from sagemaker.transformer import Transformer
transformer = Transformer(
model_name=step_create_model.properties.ModelName,
instance_type="ml.m5.xlarge",
instance_count=1,
output_path=f"s3://{default_bucket}/AbaloneTransform"
)
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
step_transform = TransformStep(
name="AbaloneTransform",
transformer=transformer,
inputs=TransformInput(data=batch_data)
)
モデル登録
学習ステップで生成した学習済みモデルをModel Registryに登録します。登録されたモデルは自動でバージョン管理され、本番環境へのデプロイなどに使用可能になるようです。モデル登録のステップが実行されるとmodel packageが生成され指定されたmodel package groupに登録されます。
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
model_metrics = ModelMetrics(
model_statistics=MetricsSource(
s3_uri="{}/evaluation.json".format(
step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
),
content_type="application/json"
)
)
step_register = RegisterModel(
name="AbaloneRegisterModel",
estimator=xgb_train,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
条件分岐
条件分岐のステップはパイプラインの各ステップのプロパティに対し、条件に応じて実行するStepを変更する場合に使用します。サンプルのパイプラインではモデルの精度(今回だとMSE)が良い場合のみにバッチ変換を行うといった感じで使用されています。
条件を先に実装し、その条件と条件に応じて実行されるステップをConditionStepへの入力として実装します。
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
ConditionStep,
JsonGet,
)
cond_lte = ConditionLessThanOrEqualTo(
left=JsonGet(
step=step_eval,
property_file=evaluation_report,
json_path="regression_metrics.mse.value",
),
right=6.0
)
step_cond = ConditionStep(
name="AbaloneMSECond",
conditions=[cond_lte],
if_steps=[step_register, step_create_model, step_transform],
else_steps=[],
)
パイプラインの定義
ここまでで各ステップの実装が完了しました。各ステップを結合し、パイプラインを実行するためにPipelineを実装します。入力は前述したname
、parameters
、steps
です。
from sagemaker.workflow.pipeline import Pipeline
pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
name=pipeline_name,
parameters=[
processing_instance_type,
processing_instance_count,
training_instance_type,
model_approval_status,
input_data,
batch_data,
],
steps=[step_process, step_train, step_eval, step_cond],
)
作成したパイプライン定義はJSONで取得することもできます。
import json
definition = json.loads(pipeline.definition())
definition
パイプラインの実行
次のコマンドでSageMaker Pipelinesサービスに作成したパイプラインを登録します。この時、Training Jobなどに渡されるロールを入力として渡します。その後パイプラインを実行します。
pipeline.upsert(role_arn=role)
execution = pipeline.start()
実行結果
各Stepを定義し、パイプラインの実行まで完了しました。サンプルファイルのようにパイプラインの概要や評価結果、フローの概要などをノートブック上で確認することができます。
そのほか、SageMaker Studioから視覚的にパイプラインの概要を確認することできます。左のツールバーから「SageMaker Components and registries」を選択し、プルダウンで「Pipelines」を選択します。表示された「AbalonePipeline」を選択すると実行されたパイプラインのリストが表示されます。
リストの中の一つのアイテムをダブルクリックするとパイプラインやパラメータ、設定など実行の概要や進行状況を確認することができます。また、Graph中のStepをクリックすることで出力や設定など各ステップの詳細を確認することができます。
これらの情報は当然Amazon SageMakerの各サービスページでも確認することができます。Amazon SageMakerのトップページからProcessing Job、Training Job、Batch Transformation Job、Inferenceのmodelなどにそれぞれ実行結果があると思います。
まとめ
AWSから提供されているサンプルコードを通して、Amazon SageMaker Pipelinesの構成要素や実装方法、実行結果をみてきました。まだまだ開発途上な感じがあるものの、パイプラインがSageMakerに統合されたことで、他のサービスに移動しなくてもパイプラインの実行や結果の確認ができるようになり、また各ステップをモジュール化することで再利用性も高まると思われ、使いこなすことで開発効率がどんどん上がっていくんじゃないかんと思います。
SageMakerに興味のある方もない方もぜひお試しください!