7
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

AWS StepFunctions を利用して機械学習パイプラインをつくってみる。

Last updated at Posted at 2022-12-09

画像2.png

はじめに

はじめまして、AMBL株式会社のOmAです。この記事はAMBL株式会社 Advent Calendar 2022の10日目の記事です。

この記事ではAWS Step Functionsを利用し、Amazon SagaMakerの各機能を連携させて機械学習パイプラインを作成します。

今回の機械学習パイプラインを作成するために、SageMakerのコードやStep FunctionsのAPIパラメータをどのように設定したのかを紹介したいと思います。

AWS Step Functionsの使用する機能

今回、Step Functionsで機械学習パイプラインを作成するためにStep Functions Workflow Studioを利用します。Workflow StudioはワークフローをGUIで確認、編集ができる機能です。

編集画面の左側からAWSの機能を選び、ドラック&ドロップでワークフローの構築ができます。

sf_WorkFlowStudio.PNG

Amazon SageMakerで使用する主な機能

SageMakerではデータの前処理にSageMaker ProcessingJob、学習にSageMakerの独自スクリプトの持ち込み(Bring Your Own Script)機能を使用します。

SageMaker ProcessingJobの概要

SageMaker ProcessingJobは任意のコードとコンテナを利用して処理を実行する機能です。

実行するコードはS3からコンテナに読み込まれ、実行されます。この時のコンテナはAWSから提供されているものだけではなく、独自に作成したコンテナも使用できます。ProcessongJobの実行後の出力はS3に保存されます。

この機能を利用して、学習データの前処理を実装します。

Processing-1.png
(引用:https://docs.aws.amazon.com/ja_jp/ja_jp/sagemaker/latest/dg/processing-job.html)

独自スクリプトの持ち込み(Bring Your Own Script)機能の概要

SageMakerのモデルの構築には大きく分けて以下の3つの選択肢があります。箇条書きの下の項目ほどコード量が増えますが、カスタマイズの自由度が増します。

  • 組み込みアルゴリズム :AWSが提供しているアルゴリズムを使用する
  • 独自スクリプトの持ち込み :モデルの構築・学習スクリプトを自作する
  • 独自コンテナの持ち込み :モデルの構築・学習スクリプトに加えて、コンテナも自作する

今回のモデルの学習では、コンテナはAWSから提供されているものを使用し、独自スクリプトで学習を行います。実装するモデルはLSTMで、学習はTensorflow2を利用しました。

機械学習パイプラインの作成

機械学習パイプラインの概要

今回のパイプラインはデータの前処理、モデルの学習、エンドポイントの設定、エンドポイントへのデプロイまでをAmazon SageMakerの機能を用いて行います。機械学習パイプラインの概要は以下の図にようになります。

pipeline_summary.PNG

Step Functions上のワークフローは以下のようになります。

workflow_summary.PNG

作成するステップの機能は以下の順番で作成します。

<コンポーネントの一覧>

  • CreateProcessingJob :データの前処理
  • CreateTrainingJob :モデルのトレーニング
  • CreateModel :モデルの推論処理の定義
  • CreateEndpointConfig:エンドポイントの設定を定義
  • DescribEndpoint :エンドポイントの有無を確認
  • UpdateEndpoint, CreateEndpoint :エンドポイントのアップデート、新規作成
  • DescribeEndpoint :エンドポイントが正常作成されたかを確認
  • Wait, Choice :エンドポイント作成までのループ処理

CreateProcessingJob

StepFunctionsのAPIパラメータ

{
  "ProcessingJobName.$": "States.Format('Preprocessing-{}',$.Execution.Name)",
  "ProcessingInputs": [
    {
      "InputName": "training_data",
      "S3Input": {
        "S3Uri": "s3://s3-bucket/input",
        "LocalPath": "/opt/ml/processing/input/data",
        "S3DataType": "S3Prefix",
        "S3InputMode": "File"
      }
    },
    {
      "InputName": "code",
      "S3Input": {
        "S3Uri": "s3://s3-bucket/code/processing_job/preprocess.py",
        "LocalPath": "/opt/ml/processing/input/code",
        "S3DataType": "S3Prefix",
        "S3InputMode": "File"
      }
    }
  ],
  "ProcessingOutputConfig": {
    "Outputs": [
      {
        "OutputName": "train",
        "S3Output": {
          "S3Uri": "s3://s3-bucket/train",
          "LocalPath": "/opt/ml/processing/output/train",
          "S3UploadMode": "EndOfJob"
        }
      },
      {
        "OutputName": "validation",
        "S3Output": {
          "S3Uri": "s3://s3-bucket/validation",
          "LocalPath": "/opt/ml/processing/output/validation",
          "S3UploadMode": "EndOfJob"
        }
      }
    ]
  },
  "AppSpecification": {
    "ImageUri": "246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3",
    "ContainerEntrypoint": [
      "python3",
      "/opt/ml/processing/input/code/preprocess.py"
    ],
    "ContainerArguments": [
      "288"
    ]
  },                       
  "RoleArn": "arn:aws:iam::000000000000:role/service-role/AmazonSageMaker-ExecutionDummyRole",
  "ProcessingResources": {
    "ClusterConfig": {
      "InstanceCount": 1,
      "InstanceType": "ml.m5.xlarge",
      "VolumeSizeInGB": 30
    }
  },
  "StoppingCondition": {
    "MaxRuntimeInSeconds": 3600
  }
}

APIパラメータの概要

  • ProcessingJobName
    JobNameはリージョンで一意に決める必要があります。

  • ProcessingInputs
    S3Uriにコンテナに持ち込みたいファイルのS3のpath、LocalPathにコンテナ上でファイルを配置する場所を指定します。コンテナ内のファイルの配置場所は'/opt/ml/processing/'から始まる必要があります。
    (参考:独自の処理コンテナを構築する (高度なシナリオ)

  • ProcessingOutputConfig
    S3Uriに出力するS3のpath、LocalPathにコンテナ上のアップロードしたいファイルのpathを指定します。
    コンテナ内のファイルの配置場所はProcessingInputsと同様に'/opt/ml/processing/'から始まる必要があります。

  • AppSpecification
    指定したDockerのImageUriでContainerEntrypointで指定されたファイルを起動する。

    • ImageUri
      DockerのコンテナImageを指定する。今回はWASが提供するscikit-learnのビルド済みコンテナを使用します。
      (参考:ビルド済みの Amazon SageMaker Scikit-learn および Spark ML 用の Docker イメージ

    • ContainerEntrypoint
      コンテナで実行する内容を指定します。
      ProcessingInputsで指定したcodeのLocalPathとEntrypointで実行するファイルパスを対応させる必要がああります。

    • ContainerArguments
      コンテナ内で実行されるコードのコマンドライン引数になります。

  • ProcessingResources
    インスタンスタイプや数を指定します。

実行ごとに作成されるStep FunctionsのContext オブジェクトのExecutionName を変数として使用して、一意な名前を作成しています。入力の変数を参照するには、パラメータを.$で終わる必要があります。
(参考:Context オブジェクト

エラー処理
CreateProcessingJobのエラーの内容に応じて、エラー処理の遷移先を決めることができます。
今回は全てのエラーに対してFailに遷移するように設定します。

  • Erros : States.ALL
  • Fallback state : Fail

CreateEndpointConfigまでの処理とエンドポイントが正常作成できない場合のエラー処理も同様に設定します。

SageMakerのコード(preprocess.py)

preprocess.py
import sys
import numpy as np
import pandas as pd
import pickle 

from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, accuracy_score
from sklearn.model_selection import train_test_split


if __name__ == "__main__":
    
    # コマンドライン引数の受け取り
    Input_dim = int(sys.argv[1])

    Seasoanl = 288
    Split_seasoanl_num = 3
    
    Bace_path = "/opt/ml/processing"

    # load airline_dataset
    df_  = pd.read_csv(Bace_path + "/input/data/temp_data_without_offset.csv")
    y = df_.max_.copy()

    dt = pd.date_range(start='2017/1/1 00:00:00',freq='5T',periods=len(y))
    df = pd.DataFrame({"ds": dt, "y": y})
    df = df.set_index('ds')


    # 学習時のデータサイズ
    train_size = (len(df)//Seasoanl - Split_seasoanl_num) * Seasoanl
    train_length = train_size - Input_dim

    # 検証時のデータサイズ
    val_size = len(df)-train_size
    val_length = val_size - Input_dim

    y = df.y.values.copy()
    y = y.reshape(-1,1)
    # 正規化する
    scaler = StandardScaler()
    y_scaled = scaler.fit_transform(y)

    # 学習データと検証データの切り分け
    train = y_scaled[:train_size, :]
    valid = y_scaled[-val_size:, :]

    x = []
    t = []

    v_x = []
    v_t = []
    # 学習データの整形
    for i in range(train_length):
        x.append(train[i:i+Input_dim])      # 説明変数
        t.append(train[i+1:i+Input_dim+1])  # 正解データ、1つずらした値

    else:
        x_train = np.array(x).reshape(train_length, Input_dim, 1) # 入力を(サンプル数、時系列の数、入力層のニューロン数)にする
        t_train = np.array(t).reshape(train_length, Input_dim, 1) # 説明変数(x_train)と同様のshape


    # 検証データの整形
    for i in range(val_length):
        v_x.append(valid[i:i+Input_dim])      # 説明変数
        v_t.append(train[i+1:i+Input_dim+1])  # 正解データ、1つずらした値

    else:
        x_validation = np.array(v_x).reshape(val_length, Input_dim, 1) # 入力を(サンプル数、時系列の数、入力層のニューロン数)にする
        t_validation = np.array(v_t).reshape(val_length, Input_dim, 1) # 説明変数(x_train)と同様のshape
        

    Train = (x_train, t_train)
    Valid = (x_validation, t_validation)
    

    # 出力ファイルの保存
    with open(Bace_path + '/output/train/train.pickle', 'wb') as f1:
        pickle.dump(Train, f1)
    
    with open(Bace_path + '/output/validation/validation.pickle', 'wb') as f2:
        pickle.dump(Valid, f2)
        

実装時の要点

  • コマンドライン引数の受け取り
    APIパラメータのContainerArgumentsで指定した値がコマンドライン引数として渡されます。

  • 出力ファイルの保存
    APIパラメータのProcessingOutputConfigのLocalPathに指定したPathにファイルを保存します。

CreateTrainingJob

StepFunctionsのAPIパラメータ

{
  "TrainingJobName.$": "States.Format('Training-{}',$$.Execution.Name)",
  "AlgorithmSpecification": {
    "TrainingImage": "763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-training:2.10.0-gpu-py39",
    "TrainingInputMode": "File"
  },
  "HyperParameters": {
    "sagemaker_submit_directory": "s3://s3-bucket/code/training_job/sourcedir.tar.gz",
    "sagemaker_program": "train.py",
    "input_dim": "288",
    "hidden_nums": "255,255",
    "epoch_num": "1",
    "batch_size": "256",
    "loss_fnc": "mean_squared_error",
    "optimizer": "Adam"
  },
  "InputDataConfig": [
    {
      "ChannelName": "train",
      "DataSource": {
        "S3DataSource": {
          "S3DataType": "S3Prefix",
          "S3Uri": "s3://s3-bucket/train"
        }
      },
      "InputMode": "File"
    }
  ],
  "OutputDataConfig": {
    "S3OutputPath": "s3://s3-bucket/model"
  },
  "ResourceConfig": {
    "InstanceCount": 1,
    "InstanceType": "ml.m4.xlarge",
    "VolumeSizeInGB": 10
  },
  "RoleArn": "arn:aws:iam::000000000000:role/service-role/AmazonSageMaker-ExecutionDummyRole"
  "StoppingCondition": {
    "MaxRuntimeInSeconds": 3600
  }
}

APIパラメータの概要

  • TrainingJobName
    ProcessingJobNameと同じく、リージョンで一意に決める必要があります。

  • AlgorithmSpecification

    • TrainingImage
      DockerのコンテナImageを指定します。
      今回はAWSが提供しているビルド済みのTensorflowコンテナを利用します。
      (参考:SageMaker TensorFlow Serving Container
  • HyperParameters
    Dockerコンテナの環境変数とタスク実行時に動作するコードに渡すハイパーパラメータを指定します。環境変数以外のハイパーパラメータはコードのコマンドライン引数として扱われます。

    • sagemaker_submit_directory
      コンテナに持ち込むコードを指定するコンテナの環境変数です。
      pythonファイルを.tart.gz形式に圧縮し、S3Uriを指定することでタスク開始時に読み込まれます。

    • sagemaker_program
      sagemaker_submit_directoryで持ち込んだpythonファイルのうち、タスク開始時に実行するコードを指定する環境変数です。

  • InputDataConfig
    コンテナ内に持ち込むデータを指定します。
    データはコンテナの環境変数 SM_CHANNEL_{ChannelName}へ読み込まれます。例えば、今回は SM_CHANNEL_TRAIN となる。

  • OutputDataConfig
    コンテナ内に出力されたモデルファイルを書き出す先を指定します。ファイルは環境変数(SM_MODEL_DIR)で指定されたフォルダ内のものが出力されます。

  • ResourceConfig
    インスタンスタイプや数を指定します。

SageMakerのコード(train.py)

train.py
import os
import argparse
import numpy as np
import pandas as pd
import pickle 


import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, LSTM


def _parsers_args():
    
    parser = argparse.ArgumentParser()
    
    # model params
    parser.add_argument('--input_dim', type=int, default = 288)
    parser.add_argument('--hidden_nums', type = str, default ="255,255")
    
    # training params
    parser.add_argument('--epoch_num', type = int, default = 1)
    parser.add_argument('--batch_size', type = int, default = 256)
    parser.add_argument('--loss_fnc', type = str, default = "mean_squared_error")
    parser.add_argument('--optimizer', type = str, default = "Adam")
    
    # 環境変数の設定
    # Container environment  modelの保存先、データセット(train, valid)の読み込み先
    parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR'))

    # fit() inputs (SM_CHANNEL_XXXX)
    parser.add_argument('--train_dir', type=str, default=os.environ.get('SM_CHANNEL_TRAIN')) 
    parser.add_argument('--valid_dir', type=str, default=os.environ.get('SM_CHANNEL_VALIDATION'))
    args = parser.parse_args()
    
    return args
    
    
def load_train_dataset(dataset_PATH):
    with open(dataset_PATH + '/train.pickle', 'rb') as f:
        train = pickle.load(f)
    
    return train


def create_model(input_dim = 288 , hidden_num = [512, 512], loss_fnc = "mean_squared_error", optim = "Adam"):
    
    # モデルの定義
    model = Sequential()

    # 隠れ層の定義
    model.add(LSTM(hidden_num[0], input_shape=(input_dim, 1), return_sequences = True))
    model.add(LSTM(hidden_num[1], return_sequences = True))

    # 出力層の定義
    model.add(Dense(1, activation="linear"))  # FC layer

    # 損失関数:MSE, 最適化関数:Adam (デフォルト引数の場合)
    model.compile(loss=loss_fnc, optimizer=optim)
    
    return model


if __name__ == "__main__":
    
    args = _parsers_args()
    
    # Set model params
    input_dim = args.input_dim
    hidden_nums = list(map(int, args.hidden_nums.split(',')))
    
    # Set training params
    epochs = args.epoch_num
    batch_size = args.batch_size
    loss_fnc = args.loss_fnc
    optim = args.optimizer
    
    # Read Dataset train and validation
    x_train, t_train = load_train_dataset(args.train_dir)
    
   # Creata lstm model
    model = create_model(input_dim = input_dim , hidden_num = hidden_nums, loss_fnc = loss_fnc, optim = optim)
    
    # Run Training
    model.fit(x_train, t_train, epochs = epochs, batch_size = batch_size)
    
    # モデルの保存 
    model.save(args.model_dir + "/1", save_format="tf")

実装時の要点

  • パイパーパラメータについて
     コンテナに環境変数と設定されていない変数は、APIパラメータで設定した変数名がキーワード引数となり実行時に受け渡されます。

  • 環境変数について
     コンテナに予め設定されていて、環境変数はos.environ.get('環境変数名')でそれぞれの値を参照できます。
    (参考:SageMaker Training Toolkit - ENVIRONMENT_VARIABLES.md 日本語版

  • モデルの保存について
     Tensorflowの場合、モデルの保存形式はsave_format="tf"であり、保存先のディレクトリ名は数字である必要があります。

CreateModel

StepFunctionsのAPIパラメータ

{
  "ModelName.$": "States.Format('Model_LSTM-{}',$$.Execution.Name)",
  "ExecutionRoleArn": "arn:aws:iam::000000000000:role/service-role/AmazonSageMaker-ExecutionDummyRole",
  "PrimaryContainer": {
    "Environment": {
    },
    "Image": "763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-inference:2.10.0-cpu-py39",
    "ModelDataUrl.$": "$.ModelArtifacts.S3ModelArtifacts"
  }
}

APIパラメータの概要

  • ModelName
    ProcessingJobName と同様に設定します。

  • PrimaryContainer

    • Environment
      コンテナの環境変数を指定することができます。今回は使用しませんでした。
  • Image
    CreateTrainingJobのTrainingImageと同様に設定します。
    (参考:SageMaker TensorFlow Serving Container

  • ModelDataUrl
    モデルが格納されているS3Urlを指定します。
    学習毎に作成されるモデルは$.ModelArtifacts.S3ModelArtifacts で参照できます。

CreateEndpointConfig

StepFunctionsのAPIパラメータ

{
  "EndpointConfigName.$": "States.Format('EndpointConfig-{}',$$.Execution.Name)",
  "ProductionVariants": [
    {
      "ServerlessConfig": {
        "MaxConcurrency": 3,
        "MemorySizeInMB": 5120
      },
      "ModelName.$": "States.Format('Model_LSTM-{}',$$.Execution.Name)",
      "VariantName": "gpu"
    }
  ]
}

APIパラメータの概要

  • EndpointConfigName
    ProcessingJobName と同様に設定します。

  • ProductionVariants
    今回は Serverless エンドポイントを使用するので、ServerlessConfigを指定します。

    • ModelName
      CreateModel 時に指定した名前を指定します。

DescribeEndpoint

StepFunctionsのAPIパラメータ

{
  "EndpointName": "LSTM_Endpoint"
}

APIパラメータの概要

  • EndpointName
    状態を取得したいエンドポイント名を指定します。

ここでは指定したエンドポインの状態を利用し分岐処理を行います。
既にエンドポイントが存在する場合はCreateEndpointへ遷移し、存在しない場合はUpdateEndpointへ遷移します。エンドポイントが存在しない場合はエラーとなるので、エラー処理で分岐を実現しています。

エラー処理

  • Errors : States.ALL
  • Fallback state : CreateEndpoint

UpdateEndpoint

StepFunctionsのAPIパラメータ

{
  "EndpointConfigName.$": "States.Format('EndpointConfig-{}',$$.Execution.Name)",
  "EndpointName": "LSTM_Endpoint"
}

APIパラメータの概要

  • EndpointConfigName
    CreateEndPointConfigで指定した名前を指定します。

  • EndpointName
    更新先のエンドポイント名を指定します。

CreateEndpoint

StepFunctionsのAPIパラメータ

{
  "EndpointConfigName.$": "States.Format('EndpointConfig-{}',$$.Execution.Name)",
  "EndpointName": "LSTM_Endpoint"
}

APIパラメータの概要

  • EndpointConfigName
    CreateEndPointConfigで指定した名前を指定します。

  • EndpointName
    作成するエンドポイント名を指定します。

DescribeEndpoint

Step Functionsは同じ名前をコンポーネントにつけることができないため、適宜名前を変更してください。

StepFunctionsのAPIパラメータ

{
    "EndpointName": "LSTM_Endpoint"
}

APIパラメータの概要

  • EndpointName
    状態を取得したいエンドポイント名を指定します。

ここではデプロイしたモデルの状態を確認します。このエンドポイントの状態に応じて次のChoiceの処理が分岐します。

Wait

概要
指定された時間、実行を待機します。
エンドポイントの状態を確認する時間間隔を設定します。


  • 30 seconds

Choice

概要
エンドポイントの状態によって処理を分岐させます。
エンドポイントが"InService"の場合にSuccessへ、"Creating"と"Updating"の場合に直前のDescribeEndpointへ遷移し、それ以外は失敗処理に遷移します。

  • Rule #1
    If these conditions are true : $.EndpointStatus == "Creating" or $.EndpointStatus == "Updating

    • 条件1
      • Variable : $.EndpointStatus
      • Operator▼ : is equal to
      • Value▼ : String constant
      • Value : Creating
    • 条件2
      • Variable : $.EndpointStatus
      • Operator▼ : is equal to
      • Value▼ : String constant
      • Value : Creating

    Then next state is : DescribeEndpoint(1)

  • Rule #2
    If these conditions are true : $.EndpointStatus == "InService"

    • 条件1
      • Variable : $.EndpointStatus
      • Operator▼ : is equal to
      • Value▼ : String constant
      • Value : Creating

    Then next state is : Success

  • Default rule

    • Default state : Fail

おわりに

今回はSageMakerのコードやStep FunctionsのAPIパラメータをどのように設定して機械学習パイプラインを作成したのかを紹介しました。
コンテナ内の環境変数をAPIパラメータでどのように設定すると正しく動くのか、という点でかなり苦労しました。Step FunctionsのAIPパラメータがPythonのBoto3の内容と似ていることに気付かなければ、より苦労していたかもしれません。

AWSでの機械学習パイプラインの構築方法はStep Functionsを利用する以外に、Amazon SageMaker Pipelinesを利用する方法があります。今度はこのAmazon SageMaker Pipelines利用した機械学習パイプラインの構築についても触れてみたいと思います。

7
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?