はじめに
- AWSのSageMakerに設置した推論エンドポイントを呼び出すAPIを実装するにあたって、推論処理に時間が掛かるため、API側を非同期処理させたいという要件がありました。
- 上記の要件を満たすためにFastAPIを使って非同期処理をするように対応したので、その際のメモ書きになります。
- ちなみにこちらにあるようにSageMakerのエンドポイントは60秒以内にレスポンスを返す必要があるので、注意しましょう。
準備
- はじめにSageMaker上に推論処理を行うエンドポイントの設定を行います。
- 今回はサンプルとして以下のようなコードを推論処理のエンドポイントとして設定します。
- リクエストを受けると5秒間スリープしてメッセージを返却する単純な処理になります。
- スリープする部分で時間のかかる推論処理を行うイメージです。
from fastapi import FastAPI
from time import sleep
import uvicorn
app = FastAPI()
@app.get('/ping')
async def ping():
return {"message": "ok"}
@app.post('/invocations')
async def invocations():
sleep(5)
return {"message": "finish"}
if __name__ == '__main__':
uvicorn.run('sagemaker_endpoint:app',
host='0.0.0.0',
port=8080,
log_level='info')
- また、エンドポイントは同時に2つ処理が行えるようにインスタンスを2個起動するように設定させておきます。
通常の呼び出し
- はじめに、設置した推論エンドポイントを同期的に呼び出すAPIを作成して試してみます。
- boto3を利用してsagemakerのクライアントオブジェクトに対してinvoke_endpointする感じになります。
import os
from fastapi import FastAPI, HTTPException
import uvicorn
import boto3
import json
import aiobotocore
from datetime import datetime
app = FastAPI()
ENDPOINT_NAME = os.environ['ENDPOINT_NAME']
@app.get('/')
async def index():
try:
sagemaker = boto3.client('sagemaker-runtime')
sagemaker_response = sagemaker.invoke_endpoint(
EndpointName='sagemaker-endpoint-test',
Accept='application/json',
ContentType='application/json',
Body=json.dumps({'message': 'test'}))
except Exception as e:
raise HTTPException(status_code=500,
detail='Sagemaker invoke endpoint exception')
response_body = sagemaker_response['Body']
return json.load(response_body)
if __name__ == '__main__':
uvicorn.run('main:app', host='0.0.0.0', port=3000, log_level='info')
- こちらのAPIを同時に4回呼び出してみます。
- asyncio.gatherを利用して非同期に呼び出してみます。
import aiohttp
import asyncio
from datetime import datetime
URL = 'http://localhost:3000'
async def main(id):
async with aiohttp.ClientSession() as session:
async with session.get(URL) as response:
res_text = await response.text()
return f"response:{res_text} id:{id}"
if __name__ == '__main__':
print(f"{datetime.now():%H:%M:%S} start")
loop = asyncio.get_event_loop()
gather = asyncio.gather(main(1), main(2), main(3), main(4))
results = loop.run_until_complete(gather)
for r in results:
print(f"result: {r}")
print(f"{datetime.now():%H:%M:%S} end")
- インスタンスは2つ起動していますが、呼び出しがシーケンシャルに処理されるので、20秒かかります(5秒✕4回)。
11:47:11 start
result: response:{"message":"finish"} id:1
result: response:{"message":"finish"} id:2
result: response:{"message":"finish"} id:3
result: response:{"message":"finish"} id:4
11:47:31 end
非同期の呼び出し
- 次に非同期に推論エンドポイントを呼び出すAPIを作成して試してみます。
- 非同期の呼び出しにはaiobotocoreを利用して実装します。
- boto3よりも低レベルの処理を行うことが出来るbotocoreをラップしているライブラリで、READMEにはs3のapiのみサポートしているとありますが、他のサービスでも利用できるでしょうとのことなので、試してみます。
- 先程のAPIと大体おなじような感じですが、async / awaitを利用して実装してあります。
import os
from fastapi import FastAPI, HTTPException
import uvicorn
import boto3
import json
import aiobotocore
from datetime import datetime
app = FastAPI()
ENDPOINT_NAME = os.environ['ENDPOINT_NAME']
@app.get('/async')
async def index_async():
print('receive requests')
try:
session = aiobotocore.get_session()
async with session.create_client('sagemaker-runtime') as client:
sagemaker_response = await client.invoke_endpoint(
EndpointName=ENDPOINT_NAME,
Accept='application/json',
ContentType='application/json',
Body=json.dumps({'message': 'test'}))
response_body = sagemaker_response['Body']
async with response_body as stream:
data = await stream.read()
return json.loads(data.decode())
except Exception as e:
print(e)
raise HTTPException(status_code=500,
detail='Sagemaker invoke endpoint exception')
if __name__ == '__main__':
uvicorn.run('main:app', host='0.0.0.0', port=3000, log_level='info')
- では、同じようにAPIを同時に4回呼び出してみます。
- レスポンスを待たずに推論エンドポイントの呼び出しが行われ、インスタンスが2つ起動しているので半分の10秒で処理が完了しています。
11:46:40 start
result: response:{"message":"finish"} id:1
result: response:{"message":"finish"} id:2
result: response:{"message":"finish"} id:3
result: response:{"message":"finish"} id:4
11:46:50 end
まとめ
- aiobotocoreを利用することで、非同期にSageMakerに設置した推論エンドポイントを呼び出すことが出来ました。
- fastapi + uvicornという構成で非同期に処理できるAPIを実装しても、AWSのリソースを呼び出す処理でI/O待ちが発生してしまっては意味がないので、他のAWSリソースを呼び出す際にも利用すると良さそうです。
- 上記に記載したコードはこちらにアップしてありますので、ご参考まで。