はじめに
はじめまして、AMBL株式会社のOmAです。この記事はAMBL株式会社 Advent Calendar 2022の10日目の記事です。
この記事ではAWS Step Functionsを利用し、Amazon SagaMakerの各機能を連携させて機械学習パイプラインを作成します。
今回の機械学習パイプラインを作成するために、SageMakerのコードやStep FunctionsのAPIパラメータをどのように設定したのかを紹介したいと思います。
AWS Step Functionsの使用する機能
今回、Step Functionsで機械学習パイプラインを作成するためにStep Functions Workflow Studioを利用します。Workflow StudioはワークフローをGUIで確認、編集ができる機能です。
編集画面の左側からAWSの機能を選び、ドラック&ドロップでワークフローの構築ができます。
Amazon SageMakerで使用する主な機能
SageMakerではデータの前処理にSageMaker ProcessingJob、学習にSageMakerの独自スクリプトの持ち込み(Bring Your Own Script)機能を使用します。
SageMaker ProcessingJobの概要
SageMaker ProcessingJobは任意のコードとコンテナを利用して処理を実行する機能です。
実行するコードはS3からコンテナに読み込まれ、実行されます。この時のコンテナはAWSから提供されているものだけではなく、独自に作成したコンテナも使用できます。ProcessongJobの実行後の出力はS3に保存されます。
この機能を利用して、学習データの前処理を実装します。
(引用:https://docs.aws.amazon.com/ja_jp/ja_jp/sagemaker/latest/dg/processing-job.html)
独自スクリプトの持ち込み(Bring Your Own Script)機能の概要
SageMakerのモデルの構築には大きく分けて以下の3つの選択肢があります。箇条書きの下の項目ほどコード量が増えますが、カスタマイズの自由度が増します。
- 組み込みアルゴリズム :AWSが提供しているアルゴリズムを使用する
- 独自スクリプトの持ち込み :モデルの構築・学習スクリプトを自作する
- 独自コンテナの持ち込み :モデルの構築・学習スクリプトに加えて、コンテナも自作する
今回のモデルの学習では、コンテナはAWSから提供されているものを使用し、独自スクリプトで学習を行います。実装するモデルはLSTMで、学習はTensorflow2を利用しました。
機械学習パイプラインの作成
機械学習パイプラインの概要
今回のパイプラインはデータの前処理、モデルの学習、エンドポイントの設定、エンドポイントへのデプロイまでをAmazon SageMakerの機能を用いて行います。機械学習パイプラインの概要は以下の図にようになります。
Step Functions上のワークフローは以下のようになります。
作成するステップの機能は以下の順番で作成します。
<コンポーネントの一覧>
- CreateProcessingJob :データの前処理
- CreateTrainingJob :モデルのトレーニング
- CreateModel :モデルの推論処理の定義
- CreateEndpointConfig:エンドポイントの設定を定義
- DescribEndpoint :エンドポイントの有無を確認
- UpdateEndpoint, CreateEndpoint :エンドポイントのアップデート、新規作成
- DescribeEndpoint :エンドポイントが正常作成されたかを確認
- Wait, Choice :エンドポイント作成までのループ処理
CreateProcessingJob
StepFunctionsのAPIパラメータ
{
"ProcessingJobName.$": "States.Format('Preprocessing-{}',$.Execution.Name)",
"ProcessingInputs": [
{
"InputName": "training_data",
"S3Input": {
"S3Uri": "s3://s3-bucket/input",
"LocalPath": "/opt/ml/processing/input/data",
"S3DataType": "S3Prefix",
"S3InputMode": "File"
}
},
{
"InputName": "code",
"S3Input": {
"S3Uri": "s3://s3-bucket/code/processing_job/preprocess.py",
"LocalPath": "/opt/ml/processing/input/code",
"S3DataType": "S3Prefix",
"S3InputMode": "File"
}
}
],
"ProcessingOutputConfig": {
"Outputs": [
{
"OutputName": "train",
"S3Output": {
"S3Uri": "s3://s3-bucket/train",
"LocalPath": "/opt/ml/processing/output/train",
"S3UploadMode": "EndOfJob"
}
},
{
"OutputName": "validation",
"S3Output": {
"S3Uri": "s3://s3-bucket/validation",
"LocalPath": "/opt/ml/processing/output/validation",
"S3UploadMode": "EndOfJob"
}
}
]
},
"AppSpecification": {
"ImageUri": "246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3",
"ContainerEntrypoint": [
"python3",
"/opt/ml/processing/input/code/preprocess.py"
],
"ContainerArguments": [
"288"
]
},
"RoleArn": "arn:aws:iam::000000000000:role/service-role/AmazonSageMaker-ExecutionDummyRole",
"ProcessingResources": {
"ClusterConfig": {
"InstanceCount": 1,
"InstanceType": "ml.m5.xlarge",
"VolumeSizeInGB": 30
}
},
"StoppingCondition": {
"MaxRuntimeInSeconds": 3600
}
}
APIパラメータの概要
-
ProcessingJobName
JobNameはリージョンで一意に決める必要があります。 -
ProcessingInputs
S3Uriにコンテナに持ち込みたいファイルのS3のpath、LocalPathにコンテナ上でファイルを配置する場所を指定します。コンテナ内のファイルの配置場所は'/opt/ml/processing/'から始まる必要があります。
(参考:独自の処理コンテナを構築する (高度なシナリオ)) -
ProcessingOutputConfig
S3Uriに出力するS3のpath、LocalPathにコンテナ上のアップロードしたいファイルのpathを指定します。
コンテナ内のファイルの配置場所はProcessingInputsと同様に'/opt/ml/processing/'から始まる必要があります。 -
AppSpecification
指定したDockerのImageUriでContainerEntrypointで指定されたファイルを起動する。-
ImageUri
DockerのコンテナImageを指定する。今回はWASが提供するscikit-learnのビルド済みコンテナを使用します。
(参考:ビルド済みの Amazon SageMaker Scikit-learn および Spark ML 用の Docker イメージ) -
ContainerEntrypoint
コンテナで実行する内容を指定します。
ProcessingInputsで指定したcodeのLocalPathとEntrypointで実行するファイルパスを対応させる必要がああります。 -
ContainerArguments
コンテナ内で実行されるコードのコマンドライン引数になります。
-
-
ProcessingResources
インスタンスタイプや数を指定します。
実行ごとに作成されるStep FunctionsのContext オブジェクトのExecutionName を変数として使用して、一意な名前を作成しています。入力の変数を参照するには、パラメータを.$
で終わる必要があります。
(参考:Context オブジェクト)
エラー処理
CreateProcessingJobのエラーの内容に応じて、エラー処理の遷移先を決めることができます。
今回は全てのエラーに対してFailに遷移するように設定します。
- Erros : States.ALL
- Fallback state : Fail
CreateEndpointConfigまでの処理とエンドポイントが正常作成できない場合のエラー処理も同様に設定します。
SageMakerのコード(preprocess.py)
import sys
import numpy as np
import pandas as pd
import pickle
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, accuracy_score
from sklearn.model_selection import train_test_split
if __name__ == "__main__":
# コマンドライン引数の受け取り
Input_dim = int(sys.argv[1])
Seasoanl = 288
Split_seasoanl_num = 3
Bace_path = "/opt/ml/processing"
# load airline_dataset
df_ = pd.read_csv(Bace_path + "/input/data/temp_data_without_offset.csv")
y = df_.max_.copy()
dt = pd.date_range(start='2017/1/1 00:00:00',freq='5T',periods=len(y))
df = pd.DataFrame({"ds": dt, "y": y})
df = df.set_index('ds')
# 学習時のデータサイズ
train_size = (len(df)//Seasoanl - Split_seasoanl_num) * Seasoanl
train_length = train_size - Input_dim
# 検証時のデータサイズ
val_size = len(df)-train_size
val_length = val_size - Input_dim
y = df.y.values.copy()
y = y.reshape(-1,1)
# 正規化する
scaler = StandardScaler()
y_scaled = scaler.fit_transform(y)
# 学習データと検証データの切り分け
train = y_scaled[:train_size, :]
valid = y_scaled[-val_size:, :]
x = []
t = []
v_x = []
v_t = []
# 学習データの整形
for i in range(train_length):
x.append(train[i:i+Input_dim]) # 説明変数
t.append(train[i+1:i+Input_dim+1]) # 正解データ、1つずらした値
else:
x_train = np.array(x).reshape(train_length, Input_dim, 1) # 入力を(サンプル数、時系列の数、入力層のニューロン数)にする
t_train = np.array(t).reshape(train_length, Input_dim, 1) # 説明変数(x_train)と同様のshape
# 検証データの整形
for i in range(val_length):
v_x.append(valid[i:i+Input_dim]) # 説明変数
v_t.append(train[i+1:i+Input_dim+1]) # 正解データ、1つずらした値
else:
x_validation = np.array(v_x).reshape(val_length, Input_dim, 1) # 入力を(サンプル数、時系列の数、入力層のニューロン数)にする
t_validation = np.array(v_t).reshape(val_length, Input_dim, 1) # 説明変数(x_train)と同様のshape
Train = (x_train, t_train)
Valid = (x_validation, t_validation)
# 出力ファイルの保存
with open(Bace_path + '/output/train/train.pickle', 'wb') as f1:
pickle.dump(Train, f1)
with open(Bace_path + '/output/validation/validation.pickle', 'wb') as f2:
pickle.dump(Valid, f2)
実装時の要点
-
コマンドライン引数の受け取り
APIパラメータのContainerArgumentsで指定した値がコマンドライン引数として渡されます。 -
出力ファイルの保存
APIパラメータのProcessingOutputConfigのLocalPathに指定したPathにファイルを保存します。
CreateTrainingJob
StepFunctionsのAPIパラメータ
{
"TrainingJobName.$": "States.Format('Training-{}',$$.Execution.Name)",
"AlgorithmSpecification": {
"TrainingImage": "763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-training:2.10.0-gpu-py39",
"TrainingInputMode": "File"
},
"HyperParameters": {
"sagemaker_submit_directory": "s3://s3-bucket/code/training_job/sourcedir.tar.gz",
"sagemaker_program": "train.py",
"input_dim": "288",
"hidden_nums": "255,255",
"epoch_num": "1",
"batch_size": "256",
"loss_fnc": "mean_squared_error",
"optimizer": "Adam"
},
"InputDataConfig": [
{
"ChannelName": "train",
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": "s3://s3-bucket/train"
}
},
"InputMode": "File"
}
],
"OutputDataConfig": {
"S3OutputPath": "s3://s3-bucket/model"
},
"ResourceConfig": {
"InstanceCount": 1,
"InstanceType": "ml.m4.xlarge",
"VolumeSizeInGB": 10
},
"RoleArn": "arn:aws:iam::000000000000:role/service-role/AmazonSageMaker-ExecutionDummyRole"
"StoppingCondition": {
"MaxRuntimeInSeconds": 3600
}
}
APIパラメータの概要
-
TrainingJobName
ProcessingJobNameと同じく、リージョンで一意に決める必要があります。 -
AlgorithmSpecification
- TrainingImage
DockerのコンテナImageを指定します。
今回はAWSが提供しているビルド済みのTensorflowコンテナを利用します。
(参考:SageMaker TensorFlow Serving Container)
- TrainingImage
-
HyperParameters
Dockerコンテナの環境変数とタスク実行時に動作するコードに渡すハイパーパラメータを指定します。環境変数以外のハイパーパラメータはコードのコマンドライン引数として扱われます。-
sagemaker_submit_directory
コンテナに持ち込むコードを指定するコンテナの環境変数です。
pythonファイルを.tart.gz形式に圧縮し、S3Uriを指定することでタスク開始時に読み込まれます。 -
sagemaker_program
sagemaker_submit_directoryで持ち込んだpythonファイルのうち、タスク開始時に実行するコードを指定する環境変数です。
-
-
InputDataConfig
コンテナ内に持ち込むデータを指定します。
データはコンテナの環境変数 SM_CHANNEL_{ChannelName}へ読み込まれます。例えば、今回は SM_CHANNEL_TRAIN となる。 -
OutputDataConfig
コンテナ内に出力されたモデルファイルを書き出す先を指定します。ファイルは環境変数(SM_MODEL_DIR)で指定されたフォルダ内のものが出力されます。 -
ResourceConfig
インスタンスタイプや数を指定します。
SageMakerのコード(train.py)
import os
import argparse
import numpy as np
import pandas as pd
import pickle
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, LSTM
def _parsers_args():
parser = argparse.ArgumentParser()
# model params
parser.add_argument('--input_dim', type=int, default = 288)
parser.add_argument('--hidden_nums', type = str, default ="255,255")
# training params
parser.add_argument('--epoch_num', type = int, default = 1)
parser.add_argument('--batch_size', type = int, default = 256)
parser.add_argument('--loss_fnc', type = str, default = "mean_squared_error")
parser.add_argument('--optimizer', type = str, default = "Adam")
# 環境変数の設定
# Container environment modelの保存先、データセット(train, valid)の読み込み先
parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
# fit() inputs (SM_CHANNEL_XXXX)
parser.add_argument('--train_dir', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
parser.add_argument('--valid_dir', type=str, default=os.environ.get('SM_CHANNEL_VALIDATION'))
args = parser.parse_args()
return args
def load_train_dataset(dataset_PATH):
with open(dataset_PATH + '/train.pickle', 'rb') as f:
train = pickle.load(f)
return train
def create_model(input_dim = 288 , hidden_num = [512, 512], loss_fnc = "mean_squared_error", optim = "Adam"):
# モデルの定義
model = Sequential()
# 隠れ層の定義
model.add(LSTM(hidden_num[0], input_shape=(input_dim, 1), return_sequences = True))
model.add(LSTM(hidden_num[1], return_sequences = True))
# 出力層の定義
model.add(Dense(1, activation="linear")) # FC layer
# 損失関数:MSE, 最適化関数:Adam (デフォルト引数の場合)
model.compile(loss=loss_fnc, optimizer=optim)
return model
if __name__ == "__main__":
args = _parsers_args()
# Set model params
input_dim = args.input_dim
hidden_nums = list(map(int, args.hidden_nums.split(',')))
# Set training params
epochs = args.epoch_num
batch_size = args.batch_size
loss_fnc = args.loss_fnc
optim = args.optimizer
# Read Dataset train and validation
x_train, t_train = load_train_dataset(args.train_dir)
# Creata lstm model
model = create_model(input_dim = input_dim , hidden_num = hidden_nums, loss_fnc = loss_fnc, optim = optim)
# Run Training
model.fit(x_train, t_train, epochs = epochs, batch_size = batch_size)
# モデルの保存
model.save(args.model_dir + "/1", save_format="tf")
実装時の要点
-
パイパーパラメータについて
コンテナに環境変数と設定されていない変数は、APIパラメータで設定した変数名がキーワード引数となり実行時に受け渡されます。 -
環境変数について
コンテナに予め設定されていて、環境変数はos.environ.get('環境変数名')
でそれぞれの値を参照できます。
(参考:SageMaker Training Toolkit - ENVIRONMENT_VARIABLES.md 日本語版) -
モデルの保存について
Tensorflowの場合、モデルの保存形式はsave_format="tf"
であり、保存先のディレクトリ名は数字である必要があります。
CreateModel
StepFunctionsのAPIパラメータ
{
"ModelName.$": "States.Format('Model_LSTM-{}',$$.Execution.Name)",
"ExecutionRoleArn": "arn:aws:iam::000000000000:role/service-role/AmazonSageMaker-ExecutionDummyRole",
"PrimaryContainer": {
"Environment": {
},
"Image": "763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-inference:2.10.0-cpu-py39",
"ModelDataUrl.$": "$.ModelArtifacts.S3ModelArtifacts"
}
}
APIパラメータの概要
-
ModelName
ProcessingJobName と同様に設定します。 -
PrimaryContainer
- Environment
コンテナの環境変数を指定することができます。今回は使用しませんでした。
- Environment
-
Image
CreateTrainingJobのTrainingImageと同様に設定します。
(参考:SageMaker TensorFlow Serving Container) -
ModelDataUrl
モデルが格納されているS3Urlを指定します。
学習毎に作成されるモデルは$.ModelArtifacts.S3ModelArtifacts で参照できます。
CreateEndpointConfig
StepFunctionsのAPIパラメータ
{
"EndpointConfigName.$": "States.Format('EndpointConfig-{}',$$.Execution.Name)",
"ProductionVariants": [
{
"ServerlessConfig": {
"MaxConcurrency": 3,
"MemorySizeInMB": 5120
},
"ModelName.$": "States.Format('Model_LSTM-{}',$$.Execution.Name)",
"VariantName": "gpu"
}
]
}
APIパラメータの概要
-
EndpointConfigName
ProcessingJobName と同様に設定します。 -
ProductionVariants
今回は Serverless エンドポイントを使用するので、ServerlessConfigを指定します。- ModelName
CreateModel 時に指定した名前を指定します。
- ModelName
DescribeEndpoint
StepFunctionsのAPIパラメータ
{
"EndpointName": "LSTM_Endpoint"
}
APIパラメータの概要
- EndpointName
状態を取得したいエンドポイント名を指定します。
ここでは指定したエンドポインの状態を利用し分岐処理を行います。
既にエンドポイントが存在する場合はCreateEndpointへ遷移し、存在しない場合はUpdateEndpointへ遷移します。エンドポイントが存在しない場合はエラーとなるので、エラー処理で分岐を実現しています。
エラー処理
- Errors : States.ALL
- Fallback state : CreateEndpoint
UpdateEndpoint
StepFunctionsのAPIパラメータ
{
"EndpointConfigName.$": "States.Format('EndpointConfig-{}',$$.Execution.Name)",
"EndpointName": "LSTM_Endpoint"
}
APIパラメータの概要
-
EndpointConfigName
CreateEndPointConfigで指定した名前を指定します。 -
EndpointName
更新先のエンドポイント名を指定します。
CreateEndpoint
StepFunctionsのAPIパラメータ
{
"EndpointConfigName.$": "States.Format('EndpointConfig-{}',$$.Execution.Name)",
"EndpointName": "LSTM_Endpoint"
}
APIパラメータの概要
-
EndpointConfigName
CreateEndPointConfigで指定した名前を指定します。 -
EndpointName
作成するエンドポイント名を指定します。
DescribeEndpoint
Step Functionsは同じ名前をコンポーネントにつけることができないため、適宜名前を変更してください。
StepFunctionsのAPIパラメータ
{
"EndpointName": "LSTM_Endpoint"
}
APIパラメータの概要
- EndpointName
状態を取得したいエンドポイント名を指定します。
ここではデプロイしたモデルの状態を確認します。このエンドポイントの状態に応じて次のChoiceの処理が分岐します。
Wait
概要
指定された時間、実行を待機します。
エンドポイントの状態を確認する時間間隔を設定します。
- 秒
30 seconds
Choice
概要
エンドポイントの状態によって処理を分岐させます。
エンドポイントが"InService"の場合にSuccessへ、"Creating"と"Updating"の場合に直前のDescribeEndpointへ遷移し、それ以外は失敗処理に遷移します。
-
Rule #1
If these conditions are true :$.EndpointStatus == "Creating" or $.EndpointStatus == "Updating
- 条件1
- Variable : $.EndpointStatus
- Operator▼ : is equal to
- Value▼ : String constant
- Value : Creating
- 条件2
- Variable : $.EndpointStatus
- Operator▼ : is equal to
- Value▼ : String constant
- Value : Creating
Then next state is : DescribeEndpoint(1)
- 条件1
-
Rule #2
If these conditions are true :$.EndpointStatus == "InService"
- 条件1
- Variable : $.EndpointStatus
- Operator▼ : is equal to
- Value▼ : String constant
- Value : Creating
Then next state is : Success
- 条件1
-
Default rule
- Default state : Fail
おわりに
今回はSageMakerのコードやStep FunctionsのAPIパラメータをどのように設定して機械学習パイプラインを作成したのかを紹介しました。
コンテナ内の環境変数をAPIパラメータでどのように設定すると正しく動くのか、という点でかなり苦労しました。Step FunctionsのAIPパラメータがPythonのBoto3の内容と似ていることに気付かなければ、より苦労していたかもしれません。
AWSでの機械学習パイプラインの構築方法はStep Functionsを利用する以外に、Amazon SageMaker Pipelinesを利用する方法があります。今度はこのAmazon SageMaker Pipelines利用した機械学習パイプラインの構築についても触れてみたいと思います。