Amazon SageMaker で、新機能の shadow testing がリリースされました
whats-new にある通り、shadow testing という新機能が出ました。
機能概要
新モデルを本番導入する時の課題
既存モデルがあり、新しいモデルに差し替えたいシーンを考えてみましょう。
「新しいモデルこそ、新しいデータを織り込んだ、あるいは新しいアルゴリズムを用いた最強のモデルである!ただ単純に新モデルをデプロイすれば良いのだ!!!」
という豪胆な方も中にはいらっしゃるかもしれませんが、一般的にはオフラインでバックテストを行い、
精度やスループット、レイテンシーなどを確認するのが一般的であり、
それらのテストを通過したモデルが初めてデプロイを行います。
またデプロイもエイヤでやらずに恐る恐る Canaly や B/G デプロイなどを行うこともあるでしょう。
しかし、それで十分なのでしょうか?
予期せぬパフォーマンス低下などが発生し、ロールバックを余儀なくされることもあります。
shadow testing
「だったら本番で試せばわかるじゃん!」
____
/ \
/ ⌒ ⌒ \ 何言ってんだこいつ
/ (●) (●) \
| 、" ゙)(__人__)" ) ___________
\ 。` ⌒゚:j´ ,/ j゙~~| | | |
__/ \ |__| | | |
| | / , \n|| | | |
| | / / r. ( こ) | | |
| | | ⌒ ーnnn |\ (⊆ソ .|_|___________|
 ̄ \__、("二) ̄ ̄ ̄ ̄ ̄l二二l二二 _|_|__|_
はい、私もそう思ってました。
本番で動かせばわかるけど、本番でそんなことできるわけがありません。
しかし、本番モデル(旧モデル)を動かしたまま、隣で新モデルをデプロイしておき、
本番モデルに流れる推論リクエストをコピーして、新モデルにも流し、推論結果やパフォーマンスをチェックする機構があれば、
本番モデルやアプリケーションに影響することなく、新モデルが問題を起こすかどうかのチェックができますよね。
それが Shadow Tests です。
また、問題が発生しなければ、既存のデプロイメントガードレールに沿ってデプロイもできます。
やってみよう
手順
大まかな手順はこちらです。
- 予め本番モデルを SageMaker Hosting を利用して、デプロイしておき、推論 Request Ready の状況にしておく
- 新モデルを SageMaker に登録(CreateModel APIを実行)しておく
- Shadow Tests の設定を行う
- 推論リクエストを行って負荷をかける
- 新モデルをデプロイする
本番モデルを推論 Request Ready にする
推論モデルの準備
今回は、本番モデルに RESNET-18, 新モデルに MobilenetV2 を用います。理由としては input/output の形が同じであり、比較しやすかったからです。まずはそれぞれのモデルを準備します。
の、前に使うライブラリなどを用意しておきます。(以降、SageMaker Studio の PyTorch 1.12 Python 3.8 CPU Optimized カーネルでノートブックで実行することを前提とします)
# SageMaker SDK をアップデート。私は 2.111.0 で実行している前提。
!pip install -U sagemaker
# 使用するライブラリを読み込み
import torch, os, json
from multiprocessing import Pool
import boto3
import urllib
import sagemaker
from PIL import Image
from torchvision import transforms
from datetime import datetime
# client その他を呼び出し
smr_client = boto3.client('sagemaker-runtime')
sm_client = boto3.client('sagemaker')
endpoint_inservice_waiter = sm_client.get_waiter('endpoint_in_service')
role = sagemaker.get_execution_role()
モデルのダウンロード
pytorch hub にあるモデルを呼び出して保存します。
RESNET
resnet_dir = 'resnet18'
!mkdir -p {resnet_dir}
model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)
torch.save(model.to('cpu'), os.path.join(resnet_dir,'resnet18.pth'))
MobilenetV2
mobilenet_dir = 'mobilenet_v2'
!mkdir -p {mobilenet_dir}
model = torch.hub.load('pytorch/vision:v0.10.0', 'mobilenet_v2', pretrained=True)
torch.save(model.to('cpu'), os.path.join(mobilenet_dir,'mobilenet_v2.pth'))
これでそれぞれのモデルの名前のディレクトリに pth の形で保存できました。
推論データをダウンロード
こちらは、GitHub に上がっている写真を利用します。
url, filename = ("https://github.com/pytorch/hub/raw/master/images/dog.jpg", "dog.jpg")
try: urllib.URLopener().retrieve(url, filename)
except: urllib.request.urlretrieve(url, filename)
推論コードを作成
モデルの推論コードを用意します。
model_fn
, input_fn
, predict_fn
, output_fn
を用意します。
詳細はこちらの動画をご覧ください。
%%writefile ./inference.py
import os, json, torch
from glob import glob
def model_fn(model_dir):
model_file = glob(os.path.join(model_dir,'*.pth'))[0]
print(f'loading {model_file}...')
model = torch.load(model_file)
return model
def input_fn(input_data,content_type):
return torch.tensor(json.loads(input_data))
def predict_fn(transformed_data,model):
with torch.no_grad():
output = model(transformed_data)
probabilities = torch.nn.functional.softmax(output[0], dim=0)
return json.dumps(probabilities.unsqueeze(0).to('cpu').detach().numpy().copy().tolist()[0])
def output_fn(output_data,accept):
print(f'response is {output_data}')
return output_data
作成した推論コードを、それぞれのモデルが格納されたディレクトリに配置します。
!cp inference.py {resnet_dir}
!mv inference.py {mobilenet_dir}
推論コードとモデルを S3 にアップロードする
SageMaker で推論エンドポイントを立ち上げるにはモデル一式を tag.gz
に固めて S3 にアップロードする必要があります。
それぞれのモデル(と推論コード、とラベル)をアップロードします。
%cd {resnet_dir}
!tar zcvf model.tar.gz ./*
%cd ..
resnet_model_s3_uri = sagemaker.session.Session().upload_data(
f'./{resnet_dir}/model.tar.gz',
key_prefix = 'resnet'
)
%cd {mobilenet_dir}
!tar zcvf model.tar.gz ./*
%cd ..
mobilenet_model_s3_uri = sagemaker.session.Session().upload_data(
f'./{mobilenet_dir}/model.tar.gz',
key_prefix = 'mobilenet'
)
モデルをデプロイする
本番モデル相当のモデル(RESNET)を SageMaker Hosting を用いて、リアルタイム推論 Ready の状態にします。
# 定数設定
resnet_model_name = 'ResnetModel'
resnet_endpoint_config_name = resnet_model_name + 'EndpointConfig'
resnet_endpoint_name = resnet_model_name + 'Endpoint'
# コンテナイメージの URL を取得
region = sagemaker.session.Session().boto_region_name
container_image_uri = sagemaker.image_uris.retrieve(
"pytorch",
region,
version='1.12.0',
instance_type = 'ml.m5.xlarge',
image_scope = 'inference'
)
print(container_image_uri)
# Model 作成
resnet_model_response = sm_client.create_model(
ModelName=resnet_model_name,
PrimaryContainer={
'Image': container_image_uri,
'ModelDataUrl': resnet_model_s3_uri,
'Environment': {
'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
'SAGEMAKER_PROGRAM': 'inference.py',
'SAGEMAKER_REGION': region,
'SAGEMAKER_SUBMIT_DIRECTORY': '/opt/ml/model/code'}
},
ExecutionRoleArn=role,
)
# EndpointConfig 作成
response = sm_client.create_endpoint_config(
EndpointConfigName=resnet_endpoint_config_name,
ProductionVariants=[
{
'VariantName': 'AllTrafic',
'ModelName': resnet_model_name,
'InitialInstanceCount': 1,
'InstanceType': 'ml.m5.xlarge',
},
],
)
# Endpoint 作成
response = sm_client.create_endpoint(
EndpointName=resnet_endpoint_name,
EndpointConfigName=resnet_endpoint_config_name,
)
# Endpoint が有効化されるまで待つ
endpoint_inservice_waiter.wait(
EndpointName=resnet_endpoint_name,
WaiterConfig={'Delay': 5,}
)
さて、一連の操作が終わったら、出来上がっているかどうかをマネジメントコンソールで確認しておきましょう。
Model の確認
EndpointConfig の確認
Endpoint の確認
新モデルを SageMaker に登録しておく
次に新モデルを SageMaker に登録(CreateModel)しましょう。
# 定数設定
mobilenet_model_name = 'MobilenetModel'
mobilenet_endpoint_config_name = mobilenet_model_name + 'EndpointConfig'
mobilenet_endpoint_name = mobilenet_model_name + 'Endpoint'
mobilenet_model_response = sm_client.create_model(
ModelName=mobilenet_model_name,
PrimaryContainer={
'Image': container_image_uri,
'ModelDataUrl': mobilenet_model_s3_uri,
'Environment': {
'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
'SAGEMAKER_PROGRAM': 'inference.py',
'SAGEMAKER_REGION': region,
'SAGEMAKER_SUBMIT_DIRECTORY': '/opt/ml/model/code'}
},
ExecutionRoleArn=role,
)
shadow test を作る
さて、ここからマネジメントコンソールを操作していきます。
SageMaker の左のペインに Shadow tests という項目が増えているのでこちらをクリックします。
適当な名前を入れて Next
を押します。今回は MyFirstShadowTest
としました。
AmazonSageMakerFullAccess
がアタッチされているロールを選択します。
先程作成した Resnet の Endpoint を選択します。
Add
から Shadow variant
を選択します。
先程作成した mobilenet のモデルを選択して Save
をクリックします。
Variants に Shadow-01
という Variant が追加(Model は MobilenetModel)されるので選択します。
Shadow Test する期間を指定します。デフォルトだと 1 週間ですが、今回は 2 時間としました。Apply
をクリックします。
最後に Enable data capture を ON にして S3 に request 及び response をキャプチャーする設定をして、 Create shadow test
をクリックします。キャプチャーすることで、新旧モデルの推論結果を比較できます。
shadow test
が Creating
というステータスに変わります。
待ちますが、裏側でどんなことをしているかを覗いて見ます。
DescribeEndpoint API を叩いてみましょう。
sm_client.describe_endpoint(
EndpointName = resnet_endpoint_name
)
{'EndpointName': 'ResnetModelEndpoint',
'EndpointArn': 'arn:aws:sagemaker:REGION:ACCOUNT:endpoint/resnetmodelendpoint',
'EndpointConfigName': 'ResnetModelEndpointConfig',
'ProductionVariants': [{'VariantName': 'AllTrafic',
(中略)
'PendingDeploymentSummary': {'EndpointConfigName': 'MyFirstShadowTes-EpConfig-DTMNBgzfjUJLHJGq',
'ProductionVariants': [{'VariantName': 'AllTrafic',
(中略)
'ShadowProductionVariants': [{'VariantName': 'Shadow-01',
'RetryAttempts': 0}}
このように shadow test が反映されているのがわかります。また、 EndpointConfig
キーがあることからも EndpointConfig にも設定が及んでいることがわかります。同様に DescribeEndpointConfig API も叩いてみましょう。
shadow_test_ep_config_name = sm_client.describe_endpoint(EndpointName = resnet_endpoint_name)['EndpointConfigName']
sm_client.describe_endpoint_config(
EndpointConfigName = shadow_test_ep_config_name
)
{'EndpointConfigName': 'MyFirstShadowTes-EpConfig-DTMNBgzfjUJLHJGq',
'EndpointConfigArn': 'arn:aws:sagemaker:REGION:ACCOUNT:endpoint-config/myfirstshadowtes-epconfig-dtmnbgzfjujlhjgq',
'ProductionVariants': [{'VariantName': 'AllTrafic',
'ModelName': 'ResnetModel',
'InitialInstanceCount': 1,
'InstanceType': 'ml.m5.xlarge',
'InitialVariantWeight': 100.0}],
(略)
'ShadowProductionVariants': [{'VariantName': 'Shadow-01',
'ModelName': 'MobilenetModel',
'InitialInstanceCount': 1,
'InstanceType': 'ml.m5.xlarge',
'InitialVariantWeight': 100.0}],
(略)
このように新しい EndpointConfig が作成されていることが確認できました。実は自分で明示的に作成した EndpointConfig から shadow test で作った EndpointConfig に差し替わっています。
マネジメントコンソールで Endpoint を確認した場合の表示は以下
shadow test を実施
先程ダウンロードしたデータで推論リクエストを投げ込んで負荷テストしてみます。
# リクエストデータを作成
input_image = Image.open(filename)
preprocess = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
input_tensor = preprocess(input_image)
body = json.dumps(input_tensor.unsqueeze(0).to('cpu').detach().numpy().copy().tolist())
これで推論の準備が整いました。まずは推論を 1 回投げ込んでみます。
response = smr_client.invoke_endpoint(
EndpointName=resnet_endpoint_name,
Body=body,
ContentType='application/json',
Accept='application/json',
)
prediction = response['Body'].read().decode('utf-8')
print(prediction)
[0.00039786743582226336, 0.00041894978494383395, 0.00038960116216912866, 0.00014468713197857141,…
とりあえず RESNET18 が各クラスの probability を返してくれていることがわかりました。モデルが mobilenet になったらこの数値が変わるはずなので、最初の 0.0000397…
という数値だけ覚えておきましょう。
さて、ここから負荷をかけてみましょう。
def invoke_rt_endpoint(body):
response = smr_client.invoke_endpoint(
EndpointName=resnet_endpoint_name,
Body=body,
ContentType='application/json',
Accept='application/json',
)
%%time
with Pool(processes=2) as pool:
pool.map(invoke_rt_endpoint, [body for i in range(10000)])
2 Parallels で合計 10000回のリクエストを投げてみました。
途中ですが、shadow test の Metrics を見てみましょう。
2500回程度のリクエストでの結果ですが、新モデル(mobilenet)のほうが重いです。4倍 近くかかっています。これは ModelLatency です。たしかに RESNET18 とは違うモデルが動いているように思います。他にもいくつか見られます。
CPU 使用率も見てみましょう。レイテンシーが増えているので CPU 使用率もその分上がっていました。
Analyticsのほうを見るとグラフで見たりすることもできます。
また、shadow test のモデルの推論結果を覗いてみましょう。指定した S3 Bucket/prefix に jsonl
の形で保管されていることがわかります。
Shadow test が通ったモデルのデプロイ
推論時間がいきなり 4 倍になって許容できるケースは珍しいですが、これならデプロイしてもいいだろう、という結論にいたったとします。shadow test 途中ですが中断して、新モデルのデプロイをしてみましょう。
実行中の shadow test の最下部に Deploy shadow variant
というボタンがあるのでクリックします。
すべてデフォルトのまま最下部に deploy
とタイプして Deploy shadow variant
をクリックします。
Status
が Stopping
になり、Progress が 100% になりました。このまま Status
が Completed になるまで待ちます。
さて、モデルが変わったか確認しましょう。
先程と同じ推論リクエストを投げてみます。
resnet という Endpoint の名前に対して mobilenet という EndpointConfig を使うのは気持ち悪いですが、これで更新されました。
最後に推論してモデルが差し替わっていることを確認しましょう。
response = smr_client.invoke_endpoint(
EndpointName=resnet_endpoint_name,
Body=body,
ContentType='application/json',
Accept='application/json',
)
prediction = response['Body'].read().decode('utf-8')
print(prediction)
[9.436015534447506e-05, 0.00025361028383485973,…
0.0000397…から9.436015534447506e-05に変わり、モデルが変更されたことが確認できました。
boto3 で shadow test をやってみよう
ここまで、shadow test をマネジメントコンソールでやってきましたが、システムに組み込むには API を直接叩いて実行しないといけません。先述の通り、shadow test を表現した EndpointConfig を用意して Endpoint を作成します。
片付けから
まずは先に作成した Model/EndpointConfig/Endpoint を削除しておきます。(他にも shadow test で作成された EndpointConfig はマネジメントコンソールから削除したこととします)
sm_client.delete_endpoint(EndpointName=resnet_endpoint_name)
sm_client.delete_endpoint_config(EndpointConfigName=resnet_endpoint_config_name)
sm_client.delete_model(ModelName=resnet_model_name)
sm_client.delete_model(EndpointName=mobilenet)
たんたんとコードを実行
再度モデルを作り直します。先程と同じです。
# RESNET
resnet_model_response = sm_client.create_model(
ModelName=resnet_model_name,
PrimaryContainer={
'Image': container_image_uri,
'ModelDataUrl': resnet_model_s3_uri,
'Environment': {
'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
'SAGEMAKER_PROGRAM': 'inference.py',
'SAGEMAKER_REGION': region,
'SAGEMAKER_SUBMIT_DIRECTORY': '/opt/ml/model/code'}
},
ExecutionRoleArn=role,
)
# mobilenet
mobilenet_model_response = sm_client.create_model(
ModelName=mobilenet_model_name,
PrimaryContainer={
'Image': container_image_uri,
'ModelDataUrl': mobilenet_model_s3_uri,
'Environment': {
'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
'SAGEMAKER_PROGRAM': 'inference.py',
'SAGEMAKER_REGION': region,
'SAGEMAKER_SUBMIT_DIRECTORY': '/opt/ml/model/code'}
},
ExecutionRoleArn=role,
)
次に EndpointConfig の作成です。ShadowProductionVariants
という引数を設定します。
shadow_test_endpoint_config_name = 'ShadowTestEndpointConfig'
response = sm_client.create_endpoint_config(
EndpointConfigName=shadow_test_endpoint_config_name,
ProductionVariants=[
{
'VariantName': 'AllTrafic',
'ModelName': resnet_model_name,
'InitialInstanceCount': 1,
'InstanceType': 'ml.m5.xlarge',
},
],
ShadowProductionVariants=[
{
'VariantName': 'Shadow-01',
'ModelName': mobilenet_model_name,
'InitialInstanceCount': 1,
'InstanceType': 'ml.m5.xlarge',
'InitialVariantWeight': 100.0
}
]
)
次に Endpoint を作成します。
shadow_test_endpoint_name = 'ShadowTestEndpoint'
response = sm_client.create_endpoint(
EndpointName=shadow_test_endpoint_name,
EndpointConfigName=shadow_test_endpoint_config_name,
)
最後に shadow test のスケジュールをします。スケジュールは、InferenceExperiment というのが管理されているので、CreateInferenceExperiment API を実行します。
inference_experiment_name = 'ShadowInferenceExperiment'
response = sm_client.create_inference_experiment(
Name=inference_experiment_name,
Type='ShadowMode',
Schedule={
'StartTime': datetime(2022, 12, 14),
'EndTime': datetime(2022, 12, 15)
},
RoleArn=role,
EndpointName=shadow_test_endpoint_name,
ModelVariants=[
{
'ModelName': resnet_model_name,
'VariantName': 'AllTrafic',
'InfrastructureConfig': {
'InfrastructureType': 'RealTimeInference',
'RealTimeInferenceConfig': {
'InstanceType': 'ml.m5.xlarge',
'InstanceCount': 1
}
}
},
{
'ModelName': mobilenet_model_name,
'VariantName': 'Shadow-01',
'InfrastructureConfig': {
'InfrastructureType': 'RealTimeInference',
'RealTimeInferenceConfig': {
'InstanceType': 'ml.m5.xlarge',
'InstanceCount': 1
}
}
}
],
ShadowModeConfig={
'SourceModelVariantName': 'AllTrafic',
'ShadowModelVariants': [
{
'ShadowModelVariantName': 'Shadow-01',
'SamplingPercentage': 100
},
]
},
)
するとマネジメントコンソールに、新しい shadow test が加わります。
あとは推論リクエストを投げ込むなり、モニターしたりして OK と判断できれば、新モデルの EndpointConfig を作成して、UpdateEndpoint API を叩けば OK です。
まとめ
このように本番運用するか迷うモデルをに対して、shadow test を作成すると、本番環境と同じ推論リクエストをコピーして、同じ負荷をかけ、また推論結果を S3 に吐き出すことができます。
あとはパフォーマンスや推論の精度を見て、許容できる範囲に収まるか、をチェックするとより安全にデプロイできるでしょう。
また、この記事では触れてませんが、いざデプロイする際は、UpdateEndpoint API を叩く際の DeploymentConfig
で CANARY などの戦略を選ぶと更に安全にデプロイできることでしょう。
というわけで SageMaker の新しい仲間、shadow test をよろしくおねがいします!