独自モデルを使ったバッチ推論を構築しよう
バッチ処理で推論するパイプラインを構築します。機械学習モデルと推論スクリプトは独自に作成したものを使います。
この記事はハンズオン形式です。順に進めることでゼロからパイプラインを構築します。もちろん部分的な参照もできます。その場合は、「ハンズオンの目次」 へ進んで下さい。
説明は画像を用いず文字で伝えるようにしました。調べたい情報がこの記事に存在しているかgrepできるようにしています。
推論処理の例はPyTorchの場合です。その他のフレームワークを利用している場合は、適宜読み替えて下さい。
はじめに
想定する読者
- 独自のLLM(機械学習モデル)を構築して運用(デプロイ)したい人
- 推論結果に即時性を求めない人。「夜間や週末に処理できていればいい」
- まずは動くものを作りたい人。「完璧な(?)MLOpsは徐々に作ればいい」
前提知識
- AWS S3は何となく使ったことがある
- Pythonで推論スクリプトを作成できる(ローカルで動かした経験があればOK)
扱わない内容
- SageMaker Studio(Pipelines) について
- 推論結果がすぐに必要な場合。「コールドスタートを許容できない」
- 機械学習モデルの構築(性能向上)
- 推論スクリプトの作成
全体の構成と利用するサービス
利用するサービスは以下の通りです。
- S3
- SageMaker
- Step Functions
下記フローチャートをStep Functionsで作成します。矢印はデータの流れです。
ハンズオンの目次
以下の手順で各処理を1つずつ作ります。それぞれで 動くこと を確認しながら次のステップへ進みます。
- 事前準備
- SageMaker ProcessingJobを作る
- SageMakerにモデルを作る
- SageMaker BatchTransformを動かす
- AWS Step Functionsを作る
事前準備(IAM/S3)
SageMaker実行用IAMの作成
IAM > ロール から ロールの作成 を開始します。以下を選択して「次へ」進みます。
- 信頼されたエンティティタイプ: AWS のサービス
- ユースケース: SageMaker
「許可の境界を設定」 することが出来ますが、ひとまずこのまま「次へ」進みます。
最後にロール名を入力して作成を完了させて下さい。
AmazonSageMakerFullAccess 権限が付与されます。この記事では、まずは動くことを目指しています。動くことが確認できたら、ここに戻って来て権限を見直すと良いでしょう。
S3にバケットを作る
S3にバケットを作成します。そして、バケット内に以下のフォルダを作成します。
/input
/processing-output
/output
「アクセス許可」から以下の「バケットポリシー」を追加します。
Resource にある bucket-name は作成したバケット名に置き換えます。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": [
ここは作成したIAMロールのARNに置き換える
]
},
"Action": [
"s3:ListBucket*",
"s3:GetBucket*",
"s3:GetObject",
"s3:PutObject",
],
"Resource": [
"arn:aws:s3:::bucket-name",
"arn:aws:s3:::bucket-name/*"
]
}
]
}
SageMaker ProcessingJobを作る
独自コンテナとスクリプトを作る
ProcessingJobで前処理を実行するにはコンテナを用意します。コンテナには実行したいスクリプトを含めます。
S3のファイルを操作するにはProcessingJob作成時にS3のURIとコンテナのパスを指定します。
S3とコンテナ間のファイルコピーは自動的に実行されます。パスの指定は後述します。
まずはスクリプトを作成します。 変換したファイルはS3へコピーするコンテナ内のパスにファイルを出力します。
このとき \n
など特殊な文字を取り除きます。
ここでCSVを出力したことを前提に後述の説明を進めます。前処理を行うファイルはExcelでも構いませんが、変換後はCSVを出力して下さい。
この先で作成する全てのスクリプトで以下を強く推奨します。
(私はデバック作業でとても苦しみました)
- どこまで実行されたかわかるように
print()
を挿入する -
try
でエラーを捕まえてprint()
で出力する
# インポートが必要な場合は忘れずに
# import ...
def main():
input_path = "/opt/ml/processing/input"
export_path = "/opt/ml/processing/output"
# 例)pandas DataFrameで必要な前処理を行ったとして
# df.to_csv(os.path.join(export_path, file_name), header=False)
if __name__ == "__main__":
main()
前処理用のスクリプトを作成したら、Dockerfileを作成します。イメージを作成したらECRにプッシュします。
FROM --platform=linux/amd64 python:3.11-slim-buster AS build
# 必要なものをインストールする
# RUN pip install
ADD processing_script.py /
ENTRYPOINT ["python3", "/processing_script.py"]
Processing Jobを動かす
S3 /input
に前処理を行うファイルをアップロードします。
SageMakerのページから Processing > 処理ジョブ を開き、「処理ジョブを作成」を行います。
下記をそれぞれ入力します。ジョブは毎回新しく作成します。(私は1度作ったジョブを使い回せると思っていました。)
名称は一意にします。ひとまずDebugで繰り返し実行できるように適当な番号を含めます。(例: job-name-1)
- ジョブ名
- コンテナ
- IAM ロール
- リソース設定
インスタンス料金については以下のドキュメントから確認できます。
最後に以下を入力します。「ローカルパス」には processing_script.py
で指定したパスを入力します。「S3 の場所」にはS3の入出力先のURIを入力します。
- 入力データ設定
- 出力データ設定
処理を開始するためには「送信」を行います。処理結果の確認は指定したS3にファイルが出力されていることを確認します。
出力されたファイルを確認したらSageMakerにモデルを作るに進みます。もし、意図しない状態やエラーが発生した場合は、仕込んでおいた print()
を頼りに解決します。
SageMakerにモデルを作る
作成した独自モデルをSageMakerに追加します。このモデルを使ってBatchTransform(推論処理)を実行します。
モデルパッケージの全体像
./
├── model.pth
└── code
├── inference.py
└── requirements.txt
model.pth はモデルデータです。例えば、PyTorchでは以下のように保存します。
torch.save(model.state_dict(), "model.pth")
推論処理を実装する
inference.py に以下の処理を実装します。
- モデルデータの読み込み
model_fn
- 入力データの処理
input_fn
- 推論処理
predict_fn
- 出力データの処理
output_fn
input_fnの入力request_body
は BatchTransform で設定する BatchStrategy で決まります。
MultiRecord
か SingleRecord
を選択します。
MultiRecord を選択した場合は以下のような入力になります。
"""Record3-Attribute1, Record3-Attribute2, Record3-Attribute3, ..., Record3-AttributeM
Record4-Attribute1, Record4-Attribute2, Record4-Attribute3, ..., Record4-AttributeM
"""
SingleRecord を選択した場合には1行ずつ入力に渡されます。
"Record3-Attribute1, Record3-Attribute2, Record3-Attribute3, ..., Record3-AttributeM"
どちらを選ばれても今後の作成方法に違いはありませんが、個人的にはデータ量が突然増加してもメモリが溢れないように SingleRecord が良いと考えています。
# インポートは忘れずに
# import ...
def model_fn(model_dir):
model_path = os.path.join(model_dir, 'model.pth')
# model_pathから読み込みます
# model = ...
return model
def input_fn(request_body, request_content_type):
"""
入力データの処理
request_bodyは上記参照
request_content_typeは後述します
"""
if request_content_type == "text/csv":
# f = StringIO(request_body)
# lines = f.read().splitlines()
# ...
else:
# ...
def predict_fn(input_data, model):
"""
推論処理
input_dataはinput_fnの出力
modelはmodel_fnの出力
"""
# 推論処理を実装します
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# model.to(device)
# model.eval()
# with torch.no_grad():
# return model(**input_data.to(device))
def output_fn(prediction, accept):
"""
出力処理
predictionはpredict_fnの出力
acceptは後述します
"""
if accept == "text/csv":
# results = []
# CSVに変換しやすいように\nで連結します
# return "\n".join(results)
else:
# ...
モデルパッケージを作る
tar
コマンドを使用してモデルパッケージを作成します。作成したモデルパッケージはS3にアップロードします。
tar -czvf model.tar.gz ./
# a .
# a ./code
# a ./model.pth
# a ./code/requirements.txt
# a ./code/inference.py
SageMakerにモデルを作成する
SageMakerのページで、「モデルの作成」を行います。
「モデル名」「IAM ロール」をそれぞれ入力します。
「コンテナ入力オプション」はモデルアーティファクトと推論イメージの場所を指定します。
を選択します。
「推論コードイメージの場所」にはコンテナイメージを指定します。下記ドキュメントを参考にRegistry pathを選びます。
例えばPyTorchの推論処理は 763104351884.dkr.ecr.ap-northeast-1.amazonaws.com/pytorch-inference:2.0.0-cpu-py310
などです。
「アーティファクトの場所」はS3にアップロードした model.tar.gz
のURIを入力します。
SageMaker BatchTransformを動かす
SageMakerのページで 推論 > バッチ変換ジョブ から「バッチ変換ジョブの作成」を選択します。
以下をそれぞれ入力します。
- ジョブ名 (ProcessingJob同様、一意にします)
- モデル名 (SageMakeにモデルを作成するのモデルを入力します)
- インスタンスタイプ (実行可能なインスタンスを選択します)
「追加設定」にある「バッチ戦略」が 「推論処理を実装する」 で説明した BatchStrategy です。実装に合わせて選択します。
入力/出力データ設定は 「Processing Jobを動かす」でCSVを出力していることを前提に説明します。
まず「入力データ設定」です。
- 「分割タイプ」
Line
を選択します - 「コンテンツタイプ」
input_fn の引数request_content_type
です。text/csv
と入力します - 「S3 の場所」
「Processing Jobを動かす」で出力した前処理済みデータがあるパスを指定します
最後に「出力データ設定」です。
- 「S3 出力パス」
作成したS3バケットの/output
のURIを入力します - 「組み合せ」
Line
を選択します - 「追加設定」にある「許可」
output_fn(prediction, accept)
の acceptに該当します。text/csv
を入力します
実行するには「ジョブの作成」を押します。「Processing Jobを動かす」と同様にS3への出力を確認します。エラーが発生した場合には print()
で頑張りましょう。(私は小さなミスもたくさんあり、ここのDebugが一番辛かったです。)
動くようになったら思い出して下さい。「入出力フィルタリングとデータ結合」という便利な機能もあります。興味のある方は調べてみると良いでしょう。
AWS Step Functionsを作る
Step Functions実行用IAMの作成
Step Functionsを実行するためのロールを作成します。許可ポリシーは以下のようにします。
[[アカウントID]]
となっている箇所は、作成したロールのIDに置き換えます。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sagemaker:CreateProcessingJob",
"sagemaker:DescribeProcessingJob",
"sagemaker:StopProcessingJob",
"sagemaker:CreateTransformJob",
"sagemaker:DescribeTransformJob",
"sagemaker:StopTransformJob",
"sagemaker:ListTags",
"sagemaker:AddTags"
],
"Resource": [
"arn:aws:sagemaker:ap-northeast-1:[[アカウントID]]:processing-job/*",
"arn:aws:sagemaker:ap-northeast-1:[[アカウントID]]:transform-job/*"
]
},
{
"Effect": "Allow",
"Action": [
"iam:PassRole"
],
"Resource": [
始めに作成したIAMロールのARN
],
"Condition": {
"StringEquals": {
"iam:PassedToService": "sagemaker.amazonaws.com"
}
}
},
{
"Effect": "Allow",
"Action": [
"events:PutTargets",
"events:PutRule",
"events:DescribeRule"
],
"Resource": [
"arn:aws:events:ap-northeast-1:[[アカウントID]]:rule/StepFunctionsGetEventsForSageMakerProcessingJobsRule",
"arn:aws:events:ap-northeast-1:[[アカウントID]]:rule/StepFunctionsGetEventsForSageMakerTransformJobsRule"
]
}
]
}
一意なジョブ名を作るために変数を準備する
ジョブ名に利用する変数を用意します。繰り返し実行するパイプラインで自動的に一意な名称が設定されるようにします。
Step Functionsのフローから「Pass」を3つ追加します。
3つの「Pass」の「入力」の項目にある「Parameters を使用して入力を変換 」にチェックを入れます。
それぞれで下記を入力します。
この準備をすると $.Date
で "YYYY-mm-dd"
の日付文字列 (UTC)、 $.ID
で "abcd1234"
ランダムな英数字8桁が使えます。
目的は一意なジョブ名を作成することです。この方法以外でも、ジョブ名が重複しなければ問題ありません。
{
"DateTimeArray.$": "States.StringSplit($$.Execution.StartTime, 'T')"
}
{
"IDs.$": "States.StringSplit(States.UUID(), '-')",
"DateTimeArray.$": "$.DateTimeArray"
}
{
"Date.$": "$.DateTimeArray[0]",
"ID.$": "$.IDs[0]"
}
ProcessingJobを呼び出す
作成した変数をジョブ名に含めるため States.Format
というStep Functionsの組み込み関数を利用します。
変数を利用するには対象のパラメータの末尾に .$
を追加します。(例: param_name.$
)
「タスクが完了するまで待機」にチェックを入れます。処理の終了を待って次の処理に進めます。
例を記載しますが、APIパラメータは以下のドキュメントから確認できます。
{
"ProcessingResources": {
"ClusterConfig": {
"InstanceCount": 1,
"InstanceType": インスタンス
"VolumeSizeInGB": 1
}
},
"ProcessingInputs": [
{
"InputName": "input",
"S3Input": {
"S3Uri": s3://bucket-name/input/,
"LocalPath": "/opt/ml/processing/input",
"S3DataType": "S3Prefix",
"S3InputMode": "File",
"S3DataDistributionType": "FullyReplicated",
"S3CompressionType": "None"
}
}
],
"ProcessingOutputConfig": {
"Outputs": [
{
"OutputName": "preprocessed_data",
"S3Output": {
"S3Uri": s3://bucket-name/processing-output,
"LocalPath": "/opt/ml/processing/output/",
"S3UploadMode": "EndOfJob"
}
}
]
},
"AppSpecification": {
"ImageUri": ECRにプッシュした前処理用のイメージ
},
"StoppingCondition": {
"MaxRuntimeInSeconds": 300
},
"RoleArn": SageMaker実行用のIAMロールのARN
"ProcessingJobName.$": "States.Format('ジョブ名-{}-{}', $.Date, $.ID)"
}
次のステップへ変数を引き継ぐ
もう1つProcessingJobの呼び出しで必要な対応があります。始めに設定したDateとIDの変数を次の処理で使えるようにします。
「出力」の項目にある「ResultPath を使用して元の入力を出力に追加」にチェックを入れます。そして「Discard result and keep original input」を選択します。これを設定すると、前処理が「入力」として受け取った変数を次の処理で使えます。
まずは動かすために元の入力値を維持しました。しかし、処理の終了状態を確認するためにはAPIのレスポンスが必要となるでしょう。その場合は「Combine original input with result」を選択します。例えば$.Result
とすると、元の入力を維持したまま、APIのレスポンスを$.Result
から次の処理で利用できます。
BatchTransformを呼び出す
最後にBatchTransformの呼び出しを追加します。まず「タスクが完了するまで待機」にチェックを入れます。ProcessingJob同様に例を示しますが、下記のドキュメントからパラメータは確認できます。
{
"ModelName": モデル名,
"BatchStrategy": "SingleRecord",
"TransformInput": {
"CompressionType": "None",
"ContentType": "text/csv",
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": s3://bucket-name/processing-output
}
},
"SplitType": "Line"
},
"TransformOutput": {
"Accept": "text/csv",
"AssembleWith": "Line",
"S3OutputPath": s3://bucket-name/output
},
"TransformResources": {
"InstanceCount": 1,
"InstanceType": インスタンス
},
"TransformJobName.$": "States.Format('ジョブ名-{}-{}', $.Date, $.ID)"
}
Step Functionsの設定は以上です。ワークフローを保存したら実行しましょう。実行が完了し、出力結果をS3から確認できれば完成です。
お疲れ様でした。
おわりに
機械学習モデル(LLM)の作成については多くの情報が見つかりましたが、実際に運用する方法が体系的にまとめられているものを私は見つけられませんでした。AWSのドキュメントを個別に探して読むことを繰り返して、私はこのパイプラインを構築しました。この記事の各項目に公式のドキュメントも合わせて載せています。私の説明が不十分な場合はドキュメントを参考にして下さい。また、ドキュメントからIAMの設定に関する記載を私は上手く見つけられませんでした。この記事にはIAMの例も示しています。良いポリシーの設定であるかという議論は必要かもしれませんが、まず「動いてくれる」ことを私は目指しました。
最後まで読んで頂きありがとうございました。