はじめに
機械学習モデルをAPIで利用するには、TensorFlow ServingのようなServingライブラリ1を用いると実現できますが、低レイテンシが求められるケースではgRPCでの通信を用いることが多いかと思います。
推論APIに限らずgRPCの負荷分散を行うには、いくつかの方法がありますがプロキシサーバとしてenvoyを用いるケース2が一般的になりつつあるようです。そんな中、2020年10月にALBがE2EでgRPCをサポートするようになりました。
そこで、本記事では、
- ローカルでのTF Servingを用いた推論
- ECSでのServingサーバ構築
について触れた後、
- ALBを用いた推論の負荷分散
- 負荷に応じたスケール変更
について紹介します。
TF ServingをECSで動作させたことをあるよっていう方は ALBを用いて推論の負荷分散を行う まで読み飛ばしていただければと思います。
また、コードはGitHubにまとめています。
構築する構成
最終的な構成は以下の図のようになります。
クライアントからALBまではHTTPS接続し、ALBからTF ServingのコンテナまではHTTP接続で通信を行います。gRPCはクライアントからコンテナまでE2Eで用います。
ローカルでTF Servingを動かす
ALBでの推論を行う前に、まずは1台のServingサーバをECSに構築します。
使用するモデル
TF Servingでは、SavedModel形式のモデルが利用できます。今回は学習済みのモデルをModel Zooから選んで用いることにします。
どれでもいいので、一番上にあった物体検知のssd_mobilenet_v1_cocoを使うことにします。ダウンロードしたファイルのうち使用するのはsaved_model.pb
のみです。
また、以下のようなディレクトリ構成に格納しておきます。
models
`-- ssd
`-- 1
`-- saved_model.pb
今回は使用するモデルの種類もバージョンも1つですが、TF Servingでは、複数のモデル、複数のバージョンを扱うことができるので、このような構成になります。
modelsはモデルのrootディレクトリのようなもので、TF Serving実行時にはMODEL_BASE_PATH
という環境変数で指定できます。ssdはモデル名となります。同じくMODEL_NAME
で指定できます。1はモデルのバージョンを表します。
SavedModelの中身確認
saved_model_cli
を使ってダウンロードしたモデルの入出形式を確認します。
saved_model_cli
はTensorFlowに付随しているツールで、pip install tensorflow
などでTensorFlow本体と同時にインストールできます。
saved_model_cli show --all --dir models/ssd/1/
MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:
signature_def['serving_default']:
The given SavedModel SignatureDef contains the following input(s):
inputs['inputs'] tensor_info:
dtype: DT_UINT8
shape: (-1, -1, -1, 3)
name: image_tensor:0
The given SavedModel SignatureDef contains the following output(s):
outputs['detection_boxes'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 100, 4)
name: detection_boxes:0
outputs['detection_classes'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 100)
name: detection_classes:0
outputs['detection_scores'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 100)
name: detection_scores:0
outputs['num_detections'] tensor_info:
dtype: DT_FLOAT
shape: (-1)
name: num_detections:0
Method name is: tensorflow/serving/predict
後々、必要なのは、signature, input, outputの名前がそれぞれ
- serving_default
- inputs
- detection_boxes, detection_classes, detection_scores, num_detections
であることです。
また、inputは1つですがoutputは4つあることや、それぞれの形状も分かります。shapeの-1は任意の次元を表します。
TF Servingでの推論
Servingサーバの起動
Servingサーバの起動するため、Dockerイメージを用意します。
モデル名であるssdをMODEL_NAME
に指定し、MODEL_BASE_PATH
はデフォルトが/models
なので変更せず/models/ssd/1
にSavedModelを配置することにします。
FROM tensorflow/serving:2.3.0
ENV MODEL_NAME=ssd
RUN apt-get update \
&& apt-get install -y --no-install-recommends curl \
&& apt-get -y clean \
&& rm -rf /var/lib/apt/lists/*
RUN curl -sL http://download.tensorflow.org/models/object_detection/ssd_mobilenet_v1_coco_2018_01_28.tar.gz \
| tar xz -C /tmp ssd_mobilenet_v1_coco_2018_01_28/saved_model/saved_model.pb \
&& mkdir -p /models/${MODEL_NAME}/1 \
&& mv /tmp/ssd_mobilenet_v1_coco_2018_01_28/saved_model/saved_model.pb /models/${MODEL_NAME}/1/saved_model.pb
上のようなDockerfileから、Servingサーバを起動します。
docker build -t serving .
docker run -p 8500:8500 serving
Pythonクライアントでのリクエスト
サーバを用意できたので、次にクライアントを作成します。
まず、画像を読み込みます。
import cv2
import numpy as np
image = cv2.cvtColor(cv2.imread(path), cv2.COLOR_BGR2RGB)
images = image[np.newaxis, :, :, :]
numpy形式のデータを、TF Servingにリクエストするための形式に変換します。ここでsaved_model_cli
で調べたsignatureやinputsと、Dockerfileで設定したモデル名を指定します。
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2
request = predict_pb2.PredictRequest()
request.model_spec.name = "ssd"
request.model_spec.signature_name = "serving_default"
request.inputs["inputs"].CopyFrom(tf.make_tensor_proto(images))
次にgrcp.insecure_channelでgRPCチャネルを作成します。引数にはローカルで起動しているServingサーバのURLとポートを指定します。
stub.Predict
でリクエストすると推論結果が得られます。
import grcp
from tensorflow_serving.apis import prediction_service_pb2_grpc
with grpc.insecure_channel("localhost:8500") as channel:
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
result = stub.Predict(request)
result.outputs
に出力が格納されており、saved_model_cli
で調べたoutputs名をkeyにすると取り出すことができますが、TensorProto
形式なので扱いやすいようにnumpy形式に変換します。
num_detections = tf.make_ndarray(result.outputs["num_detections"])
detection_boxes = tf.make_ndarray(result.outputs["detection_boxes"])
detection_classes = tf.make_ndarray(result.outputs["detection_classes"])
detection_scores = tf.make_ndarray(result.outputs["detection_scores"])
出力はモデルによって異なりますが、今回のモデルでは
-
num_detections
は検出した数 -
detection_classes
は検出したクラスID- クラスIDとクラス名の対応表はここにあります。
-
detection_scores
はスコア(信頼度) -
detection_boxes
はbounding boxの四隅の値- 左端のx、右端のx、上端のy、下端のyの順です。
- 正規化されているので、画像の横幅や縦幅を掛ける必要があります。
になります。
例えば、以下のような結果になります。なお、detection_classes[i][j]
などの二次元配列の値は、i
番目のバッチでj
番目に検出したオブジェクトの値になります。
print(f"class={detection_classes[0][0]}, score={detection_scores[0][0]}, boxes={detection_scores[0][0]}")
class=1.0, score=0.9743548631668091, boxes=[0.06180963 0.0141598 0.93000424 0.9013557 ]
また、ここまでのコードをinfer.py
としてまとめ、以下のようにURLやポートを引数で指定できるものとします。
python infer.py --url localhost --port 8500
ECSにServingサーバを構築する
ローカルで推論できたので、ECS上でServingサーバを動かしていきます。
ECSにデプロイするツールはecs-cliや最近GAになったCopilotなどいくつかありますが、今回はシンプルにawscliを用いたいと思います。
また、ECSの起動タイプはFargateでも問題ないですが、GPUやElastic Inference、Inferentiaといった推論用のデバイスが利用できないので、起動タイプにはEC2を用いることにします。ただし、今回はCPUのみです。
ECSクラスタやVPC、IAMロールなどは予め用意されているものとして、説明は省略します。後々Auto Scaling GroupでECSインスタンスを作成しますが、ここでは1台のインスタンスが使えるものとします。
タスク定義の登録
作成したDockerイメージをECRにアップロードした後、タスク定義を登録します。
aws ecs register-task-definition --cli-input-json file://task.json
{
"family": "serving",
"executionRoleArn": "ecsTaskExecutionRole",
"networkMode": "awsvpc",
"containerDefinitions": [
{
"name": "serving",
"image": "<AWS Account ID>.dkr.ecr.ap-northeast-1.amazonaws.com/serving:latest",
"memory": 512,
"portMappings": [
{
"containerPort": 8500,
"hostPort": 8500,
"protocol": "tcp"
}
]
}
],
"requiresCompatibilities": [
"EC2"
]
}
サービスの作成
登録したタスク定義からサービスを作成します。セキュリティグループは8500が開いているものを指定します。サブネットはプライベートネットワークでも良いですが、パブリックネットワークのほうがアクセスが楽です。
aws ecs create-service --cli-input-json file://service-without-alb.json
{
"cluster": "tfserving-alb-example",
"serviceName": "serving-without-alb",
"taskDefinition": "serving:1",
"launchType": "EC2",
"networkConfiguration": {
"awsvpcConfiguration": {
"securityGroups": [
"sg-11111111111111111"
],
"subnets": [
"subnet-11111111111111111",
],
"assignPublicIp": "ENABLED"
}
},
"desiredCount": 1
}
推論リクエスト
Servingサーバのタスクが起動したら、ローカルと同様にリクエストを送ってみます。
ローカルでの推論のとき、grpc.insecure_channel("localhost:8500")
ところを、今回構築したコンテナのIPアドレスに変更すると、同じように推論をすることができます。
python infer.py --url 203.0.113.1 --port 8500
ALBを用いて推論の負荷分散を行う
1台のTF ServingがECS上で構築できたところで、ようやくメインであるALB経由での推論をやっていきます。
ALBの作成と設定
はじめにロードバランサを作成します。サブネットはパブリックネットワークで、セキュリティグループは443を開けておきます。
aws elbv2 create-load-balancer \
--name serving-alb \
--subnets subnet-11111111111111111 subnet-22222222222222222 \
--security-groups sg-11111111111111111
次にターゲットグループを作成します。ターゲットであるServingサーバに対してはgRPCを用いてHTTP:8500で通信を行うので、次のように指定します。
aws elbv2 create-target-group \
--name serving-target-group \
--protocol HTTP --protocol-version GRPC --port 8500 \
--vpc-id vpc-11111111111111111 \
--target-type ip
リスナーの作成では、ALBはHTTPS:443でリクエストを受け付けるので、次のように指定します。
ロードバランサやターゲットグループのARMは先程作成したものを入れます。ALBでHTTP/2やgRPCを利用する際は、TLSが必須なのでCertificate Managerで管理する証明書も指定します。
aws elbv2 create-listener \
--protocol HTTPS --port 443 \
--load-balancer-arn arn:aws:elasticloadbalancing:ap-northeast-1:11111111111111111:loadbalancer/app/serving-alb/example \
--default-actions Type=forward,TargetGroupArn=arn:aws:elasticloadbalancing:ap-northeast-1:11111111111111111:targetgroup/serving-target-group/example \
--certificates CertificateArn=arn:aws:acm:ap-northeast-1:11111111111111111:certificate/example
ALBの準備ができたら、Route53でロードバランサのエイリアスとしてAレコードを作成します。使用するドメインは証明書のドメインと一致する必要があります。
リスナー側のTLSが必須なので、このあたりが少し厄介ですね。
ALBの設定を追加したサービスの作成
タスク定義はALBを使わない場合と同じもので大丈夫ですが、サービスはALBの設定を行うため以下のように変更したものを用います。
ALB経由でアクセスするので、サブネットはプライベートネットワークのものを用いると良いです。タスク数はとりあえず5にしておきます。
{
"cluster": "tfserving-alb-example",
"serviceName": "serving-with-alb",
"taskDefinition": "serving:1",
"loadBalancers": [
{
"targetGroupArn": "arn:aws:elasticloadbalancing:ap-northeast-1:11111111111111111:targetgroup/serving-target-group/example",
"containerName": "serving",
"containerPort": 8500
}
],
"launchType": "EC2",
"networkConfiguration": {
"awsvpcConfiguration": {
"securityGroups": [
"sg-11111111111111111"
],
"subnets": [
"subnet-11111111111111111",
],
"assignPublicIp": "DISABLED"
}
},
"desiredCount": 5
}
また、すでに作成されているサービスにALBを追加することはできないので、新しくサービスを作成します。
aws ecs create-service --cli-input-json file://service-with-alb.json
ALBとサービスの作成が完了すると、コンテナのIPが自動的にターゲットグループのターゲットに登録され、ALB経由でServingサーバにアクセスできるようになります。
クライアントの変更
これで設定したURLにアクセスできるようになりますが、クライアント側もSSLに対応させる必要があります。
gRPCチャネルに接続する
with grpc.insecure_channel(target) as channel:
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
result = stub.Predict(request)
のところを
cred = grpc.ssl_channel_credentials()
with grpc.secure_channel(target, cred) as channel:
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
result = stub.Predict(request)
のように、grpc.secure_channelに変更します。
変更後、URLを指定して推論が実行できます。もちろんURLはRoute53のレコードを作成したときのものです。
python infer.py --url serving.example.com --port 443
ALBのログを確認
複数のServingサーバが利用されているか確かめるため、数回推論を実行したあとALBのログを確認してみたいと思います。
ログをAthenaでみると、複数のtarget_ipにリクエストされていることが分かります。
SELECT type, time, target_ip, user_agent FROM "serving_alb_db"."alb_logs" ORDER BY time DESC LIMIT 10
リクエスト数に応じてオートスケールを行う
手動でServingサーバの台数(タスク数)を指定しましたが、リクエスト数によって自動でサーバの台数を増減できるようオートスケールの設定をしたいと思います。
起動タイプがEC2の場合、オートスケールはECSタスクとECSインスタンスの2つを考慮する必要があります。ちなみに、Fargateの場合はECSインスタンスはないので前者のみです。
- ECSタスクのオートスケール
- Service Auto Scalingを用いる
- リクエスト数の増減によって、タスク数を増減させる
- ECSインスタンスのオートスケール
- Cluster Auto Scalingを用いる
- 必要なインスタンス数と現在のインスタンス数の割合によって、インスタンス数を増減させる
この2つを用いることで、スケールアウトは以下のような流れで実現できます。
- リクエストが増える
- Service Auto Scalingがタスク数を増やそうとする
- インスタンス数が足りない場合はタスク数が増やせない(プロビジョニングのまま)
- Cluster Auto Scalingがインスタンス数を増やす
- 新しいインスタンスでプロビジョニングだったタスクが起動する
あとで述べますが、Cluster Auto Scalingが直接インスタンス数を増減させるわけではないです。
ECSタスクのオートスケール
register-scalable-targetでオートスケールするリソースを登録します。ECSタスクの必要数の最大容量と最小容量や関連するリソース(ALB)を指定します。
aws application-autoscaling register-scalable-target \
--service-namespace ecs \
--scalable-dimension ecs:service:DesiredCount \
--resource-id service/tfserving-alb-example/serving-with-alb \
--min-capacity 1 --max-capacity 20
次にput-scaling-policyでスケーリングポリシーを作成します。
メトリクスに対するターゲットを追跡するターゲットトラッキングポリシーを用いることで、ALBのリクエスト数(ALBRequestCountPerTarget)がTargetValueの値になるように、タスク数が増減できます。
使用できるメトリクスは、他にCPU使用率やメモリ使用率があります。
aws application-autoscaling put-scaling-policy --cli-input-json file://scaling-policy.json
{
"PolicyName": "serving-scaling-policy",
"ServiceNamespace": "ecs",
"ResourceId": "service/tfserving-alb-example/serving-with-alb",
"ScalableDimension": "ecs:service:DesiredCount",
"PolicyType": "TargetTrackingScaling",
"TargetTrackingScalingPolicyConfiguration": {
"TargetValue": 1,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "ALBRequestCountPerTarget",
"ResourceLabel": "app/serving-alb/example/targetgroup/serving-target-group/example"
},
"ScaleOutCooldown": 60,
"ScaleInCooldown": 60
}
}
ここまで設定するとリクエスト数が増加したとき、(インスタンスに余裕がある場合は)タスクが自動で増えることが確認できます。もちろん、インスタンスは増加しないので、リソースが足りない場合はタスクが作成されることはありません。
ECSインスタンスのオートスケール
Service Auto Scalingによって必要なタスクは自動で増減するようになりましたが、配置できるインスタンスが存在しない場合はタスクが作成されません。そこで、Cluster Auto Scaling (CAS)の設定を行いインスタンス数もオートスケールできるようにします。
CASを使うにはキャパシティプロバイダーを設定する必要があり、キャパシティプロバイダーは、CapacityProviderReservationというメトリクスに基づくスケーリングポリシーをAuto Scaling Group (ASG)に付与します。ASGはこのポリシーに従いインスタンスの増減を行います。
CapacityProviderReservationは、CASがすべてのタスクを実行するために必要だと見積もったインスタンス数(=M)とASG内の現在のインスタンス数(=N)との割合で定義されます。
CapacityProviderReservation = M / N * 100
例えばM > N
のときは、必要なインスタンス数が現在のインスタンス数より上回っているので、スケールアウトが必要ということになります。また、このCapacityProviderReservationはCloudWatchで確認ができます。
詳しくはこちらのAWSブログの記事を参照すると良いです。
Auto Scaling Groupの作成
先にASGを作成しておきます。
ASGを作成するために、EC2のASGの起動設定を作成します。
aws autoscaling create-launch-configuration --cli-input-json file://launch.json
{
"LaunchConfigurationName": "launch-config",
"ImageId": "ami-0b60185623255ce57",
"SecurityGroups": [
"sg-11111111111111111"
],
"UserData": "#!/bin/bash\necho ECS_CLUSTER=tfserving-alb-example >> /etc/ecs/ecs.config",
"InstanceType": "t3.micro",
"IamInstanceProfile": "arn:aws:iam::111111111111:instance-profile/ecsInstanceRole",
"AssociatePublicIpAddress": false
}
起動設定をもとにASGを作成します。スケーリングポリシーはCASによって追加されるので、ここでは設定しません。
aws autoscaling create-auto-scaling-group --cli-input-json file://asg.json
{
"AutoScalingGroupName": "serving-asg",
"LaunchConfigurationName": "launch-config",
"MinSize": 0,
"MaxSize": 10,
"DesiredCapacity": 1,
"HealthCheckType": "EC2",
"HealthCheckGracePeriod": 300,
"VPCZoneIdentifier": "subnet-11111111111111111,subnet-22222222222222222",
"NewInstancesProtectedFromScaleIn": true
}
Cluster Auto Scalingの設定
create-capacity-providerでキャパシティプロバイダーを作成します。targetCapacityは、目指すべきCapacityProviderReservation
の値です。ASGはこの値を維持するようにスケールを調整します。
aws ecs create-capacity-provider --cli-input-json file://capacity-provider.json
{
"name": "serving-capacity-provider",
"autoScalingGroupProvider": {
"autoScalingGroupArn": "arn:aws:autoscaling:ap-northeast-1:111111111111:autoScalingGroup:example:autoScalingGroupName/serving-asg",
"managedScaling": {
"status": "ENABLED",
"targetCapacity": 100,
"minimumScalingStepSize": 1,
"maximumScalingStepSize": 10
},
"managedTerminationProtection": "ENABLED"
}
}
作成したキャパシティプロバイダーをクラスタのデフォルトキャパシティプロバイダーに設定します。
aws ecs put-cluster-capacity-providers \
--cluster tfserving-alb-example \
--capacity-providers serving-capacity-provider \
--default-capacity-provider-strategy capacityProvider=serving-capacity-provider,weight=1 \
次に、既存のサービスにキャパシティプロバイダーに設定します。EC2起動タイプからキャパシティプロバイダー戦略に変更する際には--force-new-deployment
が必要になります。
aws ecs update-service \
--cluster tfserving-alb-example --service serving-with-alb \
--capacity-provider-strategy capacityProvider=serving-capacity-provider,weight=1 \
--force-new-deployment
大量リクエストをしてスケール確認
Pythonクライアントを用いて大量にリクエストを送ってみます。
少しするとタスク一覧は以下のようになりました。いくつかのタスクはRUNNINGになっていますが、PROVISIONINGのままのものあります。
もうしばらく待つとすべてRUNNINGになり、はじめ1台しかなかったECSインスタンスも10台まで増えていました。さらに時間がたつと、タスク数が1に戻り、その後ECSインスタンスも1台に戻りました。(ECSインスタンスのスケールインは結構時間がかかります。)
また、このときのCapacityProviderReservationのグラフは以下のようになりました。少し分かりにくいですが、最初と最後の平行なところが100です。100を上回ると必要なインスタンスが足りないのでスケールアウトし、反対に下回るとスケールインします。
まとめ
TensorFlow Servingを用いた推論APIサーバに対するgRPCリクエストの負荷分散を行うため、ECSとALBを利用したシステムを構築しました。
ALBを用いる方法はenvoyに比べ、
- 負荷に応じたオートスケールが容易
- 構築や運用も容易(ログも確認しやすい)
といったメリットがある反面、
- AWSでしか利用できない
- ALBを利用するためにTLSが必須
といったデメリットがあると感じました。
また、ECSのオートスケールで紹介したCASは2019年末に発表されたものであり、比較的新しい機能になります。推論サーバに限らず有用なので、今後もアップデートを期待したいです。
今回は負荷分散を行うことに重きをおいたので取り上げませんでしたが、GPUインスタンスやInf1インスタンスを用いた推論サーバについても試していきたいと思います。
-
学習は終わった。よし、API提供だ!Deep LearningのServing比較してみたに他のライブラリの紹介があります。 ↩
-
EnvoyとAmazon ECS Service Discoveryを利用したgRPCの負荷分散やAmazon EKSでgRPCサーバを運用するにはenvoyが利用されているようです。 ↩