概要
FastAPI、Pythonに詳しい方はご存じの方も多いかもしれません。
AWSのLambda関数と組み合わせるとどんな感じになるんだろう?と思って触り始めました。
色々学べたので、この機会に記事にしました!
こういったアーキテクチャが実現できるかな?という部分に焦点をおいています。また、可能な範囲でコストも抑える方針で進めました!
主な目次
- パート1:アプリケーション作成
- インフラ構築(CDK Typescript)
- Lambda関数(FastAPIのアプリケーション)の作成
- DBやcacheの構築の前に
- supabaseによるDB構築
- upstashによるcache構築
- パート2:GitHub Actionsと各ツールの利用
- GitHub Actionsのworkflowは作ったが・・・
- Ruffの利用(Linterツール)
- actの利用(ローカルでGitHub Actions)
- stepciの利用(APIのテスト)
- ふりかえりとgithubリポジトリの情報
- この後やりたいこと
実行環境
- OS : windows 11
- Python:3.12
- Nodejs : 20.9.0
- aws cdk : 2.103.1
インフラ構築(CDK Typescript)
以下画像のAWS Cloud
で囲ったリソースをCDK Typescriptで実装しました。Pythonで統一も考えましたが、慣れている方で進めました!!
AWSのDB関連のサービスが半透明なのは、使わずに構築できたからです!
Lambda関数(FastAPIのアプリケーション)の作成
FastAPIでアプリケーション作成します。Dockerイメージで動かすことにしました。理由は以下の2つです。
- Dockerイメージを使ったLambda関数の作成経験を積みたい
- zip形式の場合に、ファイルサイズの上限に引っかかるかもしれない
アプリケーションで実装した内容
アプリケーション自体は小規模です。アプリケーションで使用した各ライブラリのバージョンはrequirements.txt
に記載の通りです。(記事の最後でリポジトリ情報を載せています。)
- プロジェクト共通の設定
- ミドルウェアの設定
- エラー発生時のログ出力
- loguruというライブラリを用いて、各エンドポイント実行時のログ出力
- ミドルウェアの設定
- エンドポイント
- /dummy
- シンプルなJsonを返すエンドポイント
- /movie
- 今回のメイン。supabaseからのデータ取得およびupstashのキャッシュ利用
- /docs
- openAPIのドキュメント用のデフォルトPath(自分で作成していない)
- /dummy
Mangum
Lambda × FastAPIの組み合わせを通じて初めて目にしました!
FastAPIはAPIのエンドポイントを複数公開できますが、AWS Lambdaは1つだけしか公開できません。そこで、mangumというライブラリを使うことで、複数のエンドポイントを1つにまとめることができます。
パフォーマンスの面でもメリットがあるそうです。
Magnumを利用する際は、uvicornといったASGIのweb serverを起動するわけではなく、 FastAPIとLambda Proxy Integrationとの間に立ちイベントを変換します。
そのためオーバヘッドもほとんどゼロであり、Lambdaが適切にスケーリングすればパフォーマンスの心配はなく積極的に使っていけそうです。
AWS公式のハンズオンでも使っていました!
loguruを用いたログ出力
pythonのデフォルトでloggingというライブラリがあります。こちらを否定する訳ではなく、簡単に設定できることを求め、今回はloguruにしました。日本語による入門記事もあります。
実装はこちらの記事を参考にしました。
loguru_route_logging.py
の内容をmain.py
(アプリケーションサーバ起動時のコマンドで指定するファイル)でmiddlewareに設定します。
loguru_route_logging.py
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import time
import datetime
from loguru import logger
import sys
logger.remove()
logger.add(sys.stdout, colorize=True, format="{time:HH:mm:ss} | {level} | {message}")
class LoguruRouteLogging(BaseHTTPMiddleware):
"""各ルートの実行内容をログ出力
API内で各エンドポイントにアクセスした際に、アクセス内容をログに残す
Args:
BaseHTTPMiddleware : リクエスト/レスポンスインタフェースに対するASGIミドルウェアを記述するための抽象クラス.
"""
async def dispatch(self,request: Request, call_next) -> Response:
before = time.time()
time_local = datetime.datetime.fromtimestamp(
before, tz=datetime.timezone(datetime.timedelta(hours=9))
)
logger.debug(time_local.strftime("%Y/%m/%d %H:%M:%S%Z"))
logger.debug(f"{request.method} {request.url}")
logger.debug("Headers:")
for name, value in request.headers.items():
logger.debug(f"\t{name}: {value}")
if await request.body():
body = (await request.body()).decode("utf-8")
response = await call_next(request)
return response
main.py(簡略化)
from fastapi import FastAPI
from src.core.logger.loguru_route_logging import LoguruRouteLogging
from mangum import Mangum
# ・・・・
app = FastAPI()
# ミドルウェアに設定
app.add_middleware(LoguruRouteLogging)
def handler(event: EventAny, context: EventAny):
asgi_handler = Mangum(app)
return asgi_handler(event, context)
Secrets Managerとの連携
AWSのLambdaで環境変数を設定できますが、DBやcacheサービスのパスワードは大切に扱いたいので、利用します。。以下の記事を参考に進めました。
値を取得するためのコード
どのキーの値を取得するのか、引数name
で受け取ります。
import base64
import boto3
from botocore.exceptions import ClientError
def read_value(name: str) -> str:
"""Get secret value From SecretManager."""
secret_name = name
region_name = "ap-northeast-1"
session = boto3.session.Session()
client = session.client(
service_name="secretsmanager",
region_name=region_name,
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name,
)
if "SecretString" in get_secret_value_response:
secret_data = get_secret_value_response["SecretString"]
return secret_data
else:
decoded_binary_secret = base64.b64decode(
get_secret_value_response["SecretBinary"]
)
except ClientError as e:
raise e
Lambda関数に権限付与
Secrets Managerに設定した値がLamabda関数上で読み取れないと、データ取得処理が実行できません。IAMポリシーを追加します。
デフォルトの権限が消えるので、注意!
本題ではないので、簡単に書きます。
CDKでLambda関数のみ定義した場合、デフォルトでcloudwatchに関してロググループ作成・書き込みの権限が付与されます。
ですが、addToRolePolicy
という関数を使って、新たにLambda関数に対してポリシーを自分で設定した場合は、デフォルトの権限が消えてしまいます。
Lambda関数が関係するコード
// アプリケーション用のLambda関数
const fastApiappLambda = new lambda.Function(this, "fastapi-sample-container-lambda", {
code: lambda.Code.fromEcrImage(appEcrRepository, {
cmd: ["src.main.handler"],
tag: "latest",
}),
functionName: "fastapi-sample-container-lambda",
runtime: lambda.Runtime.FROM_IMAGE,
handler: lambda.Handler.FROM_IMAGE,
timeout: cdk.Duration.seconds(30),
// layers: [powerToolsLayer],
environment: {
ENV: 'dev',
TITLE: 'FastAPI-MyApp',
UPSTASH_REDIS_REST_HOST: process.env.UPSTASH_REDIS_REST_HOST!,
UPSTASH_REDIS_REST_PORT: process.env.UPSTASH_REDIS_REST_PORT!,
SUPABASE_URL: process.env.SUPABASE_URL!,
},
role: fastApiLambdaRole,
});
// シークレットマネージャに関するポリシー
fastApiappLambda.addToRolePolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['secretsmanager:GetSecretValue'],
resources: [
secretUpstashPassword.secretArn,
secretSupabaseKey.secretArn
],
}));
// cloudwatchに関するポリシー
fastApiappLambda.addToRolePolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['logs:CreateLogGroup', 'logs:CreateLogStream', 'logs:PutLogEvents'],
resources: ['arn:aws:logs:*:*:*'],
}));
API Gatewayとの連携
機密情報は扱ってないですが、皆が自由にアクセスできるのも、、、と思ったのでAPIキーによる認証付きのAPI Gatewayです。プロキシリソースについては、こちらの記事を参考にしました。
API Gateway作成に関係するコード
const nameRestApi = "Rest API with Lambda";
const restApi = new apigateway.RestApi(this, nameRestApi, {
restApiName: `FastAPIGateway`,
endpointTypes: [apigateway.EndpointType.REGIONAL],
deployOptions: {
stageName: 'dev',
},
});
restApi.root.addProxy({
defaultIntegration: new apigateway.LambdaIntegration(fastApiappLambda),
anyMethod: true,
defaultMethodOptions: { apiKeyRequired: true },
});
// api key:valueをシークレットにする方法
const apiKey = restApi.addApiKey("fastapiAppApiKey", {
apiKeyName: `fastapiApp-apiKey`,
// value: ""
}); // APIキーの値は未指定で自動作成
// 使用量プラン
const usagePlan = restApi.addUsagePlan("sampleApiUsagePlan", {
name: 'MyUsagePlan',
description: 'My API Usage Plan',
throttle: {
rateLimit: 50, // スロットリングのレート制限(リクエスト数)
burstLimit: 100, // スロットリングのバースト制限(一度に許可される最大リクエスト数)
},
quota: {
limit: 1000, // クォータの制限(リクエスト数)
period: apigateway.Period.WEEK, // クォータの期間(MONTH, WEEK, DAY)
},
});
usagePlan.addApiKey(apiKey);
usagePlan.addApiStage({
stage: restApi.deploymentStage
});
FastAPI側のアプリケーションでもURLの設定には手を加えていますが、この状態になれば、
URL:https://XXXXXX.execute-api.ap-northeast-1.amazonaws.com/dev/dummy
とAPIキーでGETリクエストを送ると、FastAPI上のエンドポイント、/dummy
へリクエストが送信できるようになりました。
DBやcacheの構築の前に・・・
この後データベースとキャッシュを用意しますが、RDSのランニングコストは馬鹿にできないです。。。
私は今年に入ってからsupabaseを使用しています。(詳細は後ほど)
また、キャッシュは最近知ったupstashという外部サービスを使ってみます。(詳細は後ほど)
接続情報の扱い
DBサービスやcacheサービスの接続情報は、github上にPushすることがないようご注意ください。
supabaseによるDB構築
「Firebaseにとって代わるオープンソース」という表現をされています。
今回はデータベースですが、StorageやEdge Function(自作APIの構築)もあります。
直感的な実装でデータ操作が行えて、javascript・Pythonなど、複数言語対応しています。DBとしてはpostgresです。
supabaseのテーブルからデータ取得
早速実装しますが、以下の内容を真似るだけです!
インスタンス定義
read_value関数は、Secret Managerのパートで紹介したコードです。
import os
from supabase import Client, create_client
from src.api.dependencies.read_secret_manager import read_value
def get_supabase_key() -> str:
"""Get supabase key From SecretManager."""
# Secret managerから値取得
return read_value(name="supabase-key")
def supabase()-> Client:
"""Define supabase client."""
url: str = os.environ.get("SUPABASE_URL")
key: str = get_supabase_key()
return create_client(url, key)
データ取得処理
from src.api.dependencies.supabase import supabase
supabase_client.table("movies").select("*").execute()
alembicによるDBマイグレーション
先に取得処理を書きましたが、テーブル定義と反映はalembicを使います。
SQLAlchemyとセットで用いるDBマイグレーションツールです。
以下の2つの記事を参考にしました。
ざっくりですが、コマンドはinit、revison、upgradeの順に実行します。
revisionコマンド実行時に生成されたファイルに、DBに対する更新内容を記載します。
マイグレーションに関する実装
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('movies',
sa.Column('id', sa.BigInteger(), nullable=False),
sa.Column('name', sa.Text(), nullable=True),
sa.Column('description', sa.Text(), nullable=True,unique=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_name'), 'movies', ['name'], unique=False)
op.create_index(op.f('ix_id'), 'movies', ['id'], unique=True)
# ### end Alembic commands ###
alembic.ini
というファイルで1つ問題があります。
DBのURLを記載する箇所がありますが。特に考慮しなければ、DBのパスワードを含んだ情報がハードコーディングされてしまいます。。(.gitgnore
に含まれないファイル)
環境変数から値を読み込む方法は、以下を参考にしました。
更新した2ファイルとその説明
環境変数:`SQLALCHEMY_DATABASE_URL`の値を、`SQLALCHEMY_DATABASE_URL`というキーで利用する処理を追加しています。これを`alembic.ini`で読み込む形です。import os
from dotenv import load_dotenv
load_dotenv()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
config.set_section_option("alembic", "SQLALCHEMY_DATABASE_URL", os.environ.get("SQLALCHEMY_DATABASE_URL"))
[alembic]
sqlalchemy.url = %(SQLALCHEMY_DATABASE_URL)s
作成したmoviesテーブルにデータを10件登録しました。
upstashによるcache構築
自分できちんと実装する機会がなかったので、やってみました。以下のリンクの内容でサクッと構築できます。
Lambda関数のQuickstartもありました。
インスタンス定義
import redis
from redis import Redis
from src.api.dependencies.read_secret_manager import read_value
from src.core.config import settings
def get_redis_password() -> str:
"""Get Upstash Password From SecretManager."""
return read_value(name="upstash-redis-rest-password")
def cache() -> Redis:
"""Definition redis instance."""
return redis.Redis(
host= settings.upstash_redis_rest_host,
port= settings.upstash_redis_rest_port,
password=get_redis_password(),
ssl=True,
)
データ取得処理(先ほどのsupabaseと同じ箇所)
- upstash上に、キー:
movies-python
があれば、キャッシュからデータを取得 - upstash上に、キー:
movies-python
が無ければ、supabaseのテーブルからデータを取得後、pickle.dumpsを用いて、データをシリアライズして保存(※) - キー:
movies-python
は、30秒経過で破棄
@router.get("/", response_model=list[MovieResponse])
async def index(
redis_client: cache = Depends(cache), supabase_client: supabase = Depends(supabase)
) -> list[MovieResponse]:
"""Endpoint about use cache."""
try:
if (cached_data := redis_client.get("movies-python")) is not None:
logger.info("Use cache")
return pickle.loads(cached_data)
logger.info("Get Data From DB")
response = supabase_client.table("movies").select("*").execute()
movies = response.data
redis_client.set("movies-python", pickle.dumps(movies))
redis_client.expire("movies-python", 30)
return movies
except Exception as e:
traceback.print_exc()
raise APIException(ErrorMessage.FAILED_FETCH_DATA_FROM_MOVIE_TABLE)
※データをシリアライズして保存、参考記事です。
キャッシュ導入してレスポンスまでの時間は短くできた?
短くできました。10件のデータでこれだけ差が発生しているので、もっとデータ量が多いとき、キャッシュのありがたさを実感できるな、と思いました。
(何度か実行してみて、おおよそ4-5倍短い時間でレスポンスが返ってきました。)
DBから取得した場合
取得前にupstashのCLIでキー:movies-python
がないことを確認します。
cacheから取得した場合
取得前にupstashのCLIでキー:movies-python
が存在することを確認します。
GitHub Actionsのworkflowは作ったが・・・
上記アプリケーションについて、DockerイメージのBuild・Pushし、Lambda関数を更新するworkflowを作成しましたが、何度か実装経験があり、大した苦労なく完了しました。。。
どうせなら、もうちょっと実務に近い形にしたいと思い、以降の内容に取り組みました。
Ruffの利用(Linterツール)
Rust製のLinterツールです。使ったことがなかったので、この機会に触ってみました。動作を軽く確認して、Github Actionsのworkflowに組み込みます。
An extremely fast Python linter and code formatter, written in Rust.
また、ルール一覧のページに記載の通り、Ruff supports over 700 lint rules,
、だそうです!
適用ルールを決めるために、設定ファイルを用意しよう
プロジェクトのルートディレクトリに、pyproject.toml
を用意しました。ファイル名は制限があるので、詳しくはこちら、ご確認ください。
[tool.ruff]
# Enable pycodestyle (`E`) and Pyflakes (`F`) codes by default.
select = ["A", "B", "C", "D", "E", "F", "W", "ANN"]
ignore = [
"B008", # do not perform function calls in argument defaults
"C901", # too complex
"B904", # raise from err
]
fixable = ["I001", "D201"]
target-version = "py312"
select , ignore , fixableの説明はこちら
以下のコマンド実行で、現在の階層から再帰的に各ファイルをチェックします。
ruff check .
エラーがでてきました。修正or無視は、皆様の判断にお任せいたします!
actの利用(ローカルでGithub Actions)
Pushして挙動を確認、エラーがあればそれらを解消・・・と進める場合、workflow runsの数が大きめになることもあります。(場合によってはコストに影響?)
Push(やpull_requestなど)実行時の挙動を、ローカル確認できれば、開発も進めやすそうです。
それを実現できるのが、actです。Dockerさえあれば使用できます。
GitHub Actionsと"仲良くなる"ための練習方法というスライドにありました。
動かしてみよう
作成途中のyamlファイルですが、このファイルで検証してみます。
yamlファイル
name: FastAPI App CI/CD Pipeline
run-name: ${{ github.actor }} is testing out GitHub Actions 🚀
on: [push]
jobs:
Explore-GitHub-Actions:
runs-on: ubuntu-latest
steps:
- run: echo "🎉 The job was automatically triggered by a ${{ github.event_name }} event."
# Ruff Linter Test
- uses: actions/checkout@v3
- uses: actions/setup-python@v2
- uses: chartboost/ruff-action@v1
with:
src: ./src
version: 0.1.6
# Run FastAPI App
- uses: actions/checkout@v3
- name: Install Python
uses: actions/setup-python@v4
with:
python-version: "3.12"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
AWSサービスが関わるworkflowも確認する場合は、認証情報を含めた変数の設定など、もう少し準備が必要になりそうです。
dockerの起動後、lオプション付きで実行すると、workflowを検知してくれます。
sudo service docker status
* Docker is running
act -l
Stage Job ID Job name Workflow name Workflow file Events
0 Explore-GitHub-Actions Explore-GitHub-Actions FastAPI App CI/CD Pipeline sample.yaml push
act push
を実行して、pushイベントの挙動を確認します。
2枚目の画像の上の方に、Ruffのエラーが表示され、Job failed
となっています。
Ruffのルールを全て満たせたら、成功になる?ということで、pyproject.toml
のignoreに、selectで設定したアルファベットを追加します。
select = ["A", "B", "C", "D", "E", "F", "W", "ANN"]
ignore = [
"A",
.....
"ANN",
"B008", # do not perform function calls in argument defaults
実際にGithub ActionsにPushしたときと同じ結果になるの?ということで実際にPushしました。actの実行時と同じ結果でした!(コード状況が少し異なるので、error数差分あり)
stepciの利用(テスト)
こちらも割と最近知った、文字通りCIツールです。こちらの書籍を読んでいた中で、dreddとschemathesisというAPIテストのライブラリを使用してました。他にあるかな~と思っていた所で見つけました。
こちらも挙動を確認してから、Github Actionsのworkflowに組み込みます。
シンプルなハンズオン
FastAPIとのIntegration
以下コマンドを実行すると、コマンドを実行した階層にworkflow.yamlというファイルが生成されます。
FastAPIの各エンドポイントの戻り値の型ヒントを設定しておくと、より正確に仕様を表したファイルが生成されます。
stepci generate http://127.0.0.1:8000/openapi.json
作成したワークフロー
version: "1.0"
name: FastAPI
config:
http: "http://127.0.0.1:8001"
env:
domainPort: "http://127.0.0.1:8001"
tests:
default:
name: Default
steps:
- id: dummy__get
name: Dummy
http:
url: ${{env.domainPort}}/dummy
method: GET
check:
status: /^20/
schema:
$ref: "#/components/schemas/DummyResponse"
components:
schemas:
DummyResponse:
properties:
status:
type: string
title: Status
type: object
required:
- status
title: DummyResponse
description: Response model to validate and return when performing dummy endpoint.
workflowの設定が完了したら、実行します。
Workflow passed
となって成功となりました。
まだ、シンプルなテストしか実装していないので、この後より本格的なテストを作成していきたいと思います。
ふりかえりとgithubリポジトリの情報
FastAPIをLambda関数として作成して知見を増やすのが、当初中心となる予定でしたが、
それ以外の内容の収穫が多かった印象です。(笑)
以下の3つは今後も利用する機会を増やしていきたいと思いました。
- upstashによるキャッシュを利用したデータ取得
- actによるローカルGithub Actions実行
- stepciによるテスト実行
FastAPIについて、規模が大きいアプリケーションを作成していないので、題材が見つかったタイミングで取り組んでみたいと思います。
Lambda関数のイメージ更新について、workflow内ではコマンド1行で終わりましたが、もう少し踏み込んでいました。
lambroll
というライブラリがあります。少し触ってみて、Revisionは作成できましたが、Lambda関数が使用するイメージの更新ができず。
自分の操作・理解に問題があると思うので、別の機会に再確認したいと思います。
今回の成果物について、以下リポジトリで確認いただけます。
アプリケーション側
インフラ側
この後やりたいこと
現段階で、修正したい、新たに取り組みたいのは以下の通りです。
日々少しずつ進めていきたいと思います!
- CDKの実装修正(次のメジャーバージョンアップで廃止となる実装あり)
- actでAWSサービスを含めたworkflowの検証
- フォルダ構成の整理やloggingの深堀り
- lambrollを用いたイメージ更新の正しい手順の確認
- openAIを利用したAPIの構築