4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

SageMakerのエンドポイントを呼び出すAPIをFastAPIを使って非同期処理させる

Posted at

はじめに

  • AWSのSageMakerに設置した推論エンドポイントを呼び出すAPIを実装するにあたって、推論処理に時間が掛かるため、API側を非同期処理させたいという要件がありました。
  • 上記の要件を満たすためにFastAPIを使って非同期処理をするように対応したので、その際のメモ書きになります。
  • ちなみにこちらにあるようにSageMakerのエンドポイントは60秒以内にレスポンスを返す必要があるので、注意しましょう。

準備

  • はじめにSageMaker上に推論処理を行うエンドポイントの設定を行います。
    • 今回はサンプルとして以下のようなコードを推論処理のエンドポイントとして設定します。
    • リクエストを受けると5秒間スリープしてメッセージを返却する単純な処理になります。
      • スリープする部分で時間のかかる推論処理を行うイメージです。
from fastapi import FastAPI
from time import sleep
import uvicorn

app = FastAPI()

@app.get('/ping')
async def ping():
    return {"message": "ok"}

@app.post('/invocations')
async def invocations():
    sleep(5)
    return {"message": "finish"}

if __name__ == '__main__':
    uvicorn.run('sagemaker_endpoint:app',
                host='0.0.0.0',
                port=8080,
                log_level='info')
  • また、エンドポイントは同時に2つ処理が行えるようにインスタンスを2個起動するように設定させておきます。

通常の呼び出し

  • はじめに、設置した推論エンドポイントを同期的に呼び出すAPIを作成して試してみます。
    • boto3を利用してsagemakerのクライアントオブジェクトに対してinvoke_endpointする感じになります。
import os
from fastapi import FastAPI, HTTPException
import uvicorn
import boto3
import json
import aiobotocore
from datetime import datetime

app = FastAPI()
ENDPOINT_NAME = os.environ['ENDPOINT_NAME']

@app.get('/')
async def index():
    try:
        sagemaker = boto3.client('sagemaker-runtime')
        sagemaker_response = sagemaker.invoke_endpoint(
            EndpointName='sagemaker-endpoint-test',
            Accept='application/json',
            ContentType='application/json',
            Body=json.dumps({'message': 'test'}))
    except Exception as e:
        raise HTTPException(status_code=500,
                            detail='Sagemaker invoke endpoint exception')

    response_body = sagemaker_response['Body']
    return json.load(response_body)

if __name__ == '__main__':
    uvicorn.run('main:app', host='0.0.0.0', port=3000, log_level='info')
  • こちらのAPIを同時に4回呼び出してみます。
    • asyncio.gatherを利用して非同期に呼び出してみます。
import aiohttp
import asyncio
from datetime import datetime

URL = 'http://localhost:3000'

async def main(id):
    async with aiohttp.ClientSession() as session:
        async with session.get(URL) as response:
            res_text = await response.text()
            return f"response:{res_text}  id:{id}"


if __name__ == '__main__':
    print(f"{datetime.now():%H:%M:%S} start")
    loop = asyncio.get_event_loop()
    gather = asyncio.gather(main(1), main(2), main(3), main(4))
    results = loop.run_until_complete(gather)
    for r in results:
        print(f"result: {r}")
    print(f"{datetime.now():%H:%M:%S} end")



  • インスタンスは2つ起動していますが、呼び出しがシーケンシャルに処理されるので、20秒かかります(5秒✕4回)。
11:47:11 start
result: response:{"message":"finish"}  id:1
result: response:{"message":"finish"}  id:2
result: response:{"message":"finish"}  id:3
result: response:{"message":"finish"}  id:4
11:47:31 end

非同期の呼び出し

  • 次に非同期に推論エンドポイントを呼び出すAPIを作成して試してみます。
  • 非同期の呼び出しにはaiobotocoreを利用して実装します。
    • boto3よりも低レベルの処理を行うことが出来るbotocoreをラップしているライブラリで、READMEにはs3のapiのみサポートしているとありますが、他のサービスでも利用できるでしょうとのことなので、試してみます。
  • 先程のAPIと大体おなじような感じですが、async / awaitを利用して実装してあります。
import os
from fastapi import FastAPI, HTTPException
import uvicorn
import boto3
import json
import aiobotocore
from datetime import datetime

app = FastAPI()
ENDPOINT_NAME = os.environ['ENDPOINT_NAME']


@app.get('/async')
async def index_async():
    print('receive requests')
    try:
        session = aiobotocore.get_session()
        async with session.create_client('sagemaker-runtime') as client:
            sagemaker_response = await client.invoke_endpoint(
                EndpointName=ENDPOINT_NAME,
                Accept='application/json',
                ContentType='application/json',
                Body=json.dumps({'message': 'test'}))
            response_body = sagemaker_response['Body']
            async with response_body as stream:
                data = await stream.read()
                return json.loads(data.decode())
    except Exception as e:
        print(e)
        raise HTTPException(status_code=500,
                            detail='Sagemaker invoke endpoint exception')

if __name__ == '__main__':
    uvicorn.run('main:app', host='0.0.0.0', port=3000, log_level='info')

  • では、同じようにAPIを同時に4回呼び出してみます。
  • レスポンスを待たずに推論エンドポイントの呼び出しが行われ、インスタンスが2つ起動しているので半分の10秒で処理が完了しています。
11:46:40 start
result: response:{"message":"finish"}  id:1
result: response:{"message":"finish"}  id:2
result: response:{"message":"finish"}  id:3
result: response:{"message":"finish"}  id:4
11:46:50 end

まとめ

  • aiobotocoreを利用することで、非同期にSageMakerに設置した推論エンドポイントを呼び出すことが出来ました。
  • fastapi + uvicornという構成で非同期に処理できるAPIを実装しても、AWSのリソースを呼び出す処理でI/O待ちが発生してしまっては意味がないので、他のAWSリソースを呼び出す際にも利用すると良さそうです。
  • 上記に記載したコードはこちらにアップしてありますので、ご参考まで。
4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?