LoginSignup
6
6

FastAPIのLambdaからAWS外のサービスでキャッシュとDB構築! + Github Actions

Last updated at Posted at 2023-11-29

概要

FastAPI、Pythonに詳しい方はご存じの方も多いかもしれません。
AWSのLambda関数と組み合わせるとどんな感じになるんだろう?と思って触り始めました。
色々学べたので、この機会に記事にしました!

こういったアーキテクチャが実現できるかな?という部分に焦点をおいています。また、可能な範囲でコストも抑える方針で進めました!

主な目次

  • パート1:アプリケーション作成
    • インフラ構築(CDK Typescript)
    • Lambda関数(FastAPIのアプリケーション)の作成
    • DBやcacheの構築の前に
    • supabaseによるDB構築
    • upstashによるcache構築
  • パート2:GitHub Actionsと各ツールの利用
    • GitHub Actionsのworkflowは作ったが・・・
    • Ruffの利用(Linterツール)
    • actの利用(ローカルでGitHub Actions)
    • stepciの利用(APIのテスト)
  • ふりかえりとgithubリポジトリの情報
  • この後やりたいこと

実行環境

  • OS : windows 11
  • Python:3.12
  • Nodejs : 20.9.0
  • aws cdk : 2.103.1

インフラ構築(CDK Typescript)

以下画像のAWS Cloudで囲ったリソースをCDK Typescriptで実装しました。Pythonで統一も考えましたが、慣れている方で進めました!!
AWSのDB関連のサービスが半透明なのは、使わずに構築できたからです!

image.png

Lambda関数(FastAPIのアプリケーション)の作成

FastAPIでアプリケーション作成します。Dockerイメージで動かすことにしました。理由は以下の2つです。

  • Dockerイメージを使ったLambda関数の作成経験を積みたい
  • zip形式の場合に、ファイルサイズの上限に引っかかるかもしれない

アプリケーションで実装した内容

アプリケーション自体は小規模です。アプリケーションで使用した各ライブラリのバージョンはrequirements.txtに記載の通りです。(記事の最後でリポジトリ情報を載せています。)

  • プロジェクト共通の設定
    • ミドルウェアの設定
      • エラー発生時のログ出力
      • loguruというライブラリを用いて、各エンドポイント実行時のログ出力
  • エンドポイント
    • /dummy
      • シンプルなJsonを返すエンドポイント
    • /movie
      • 今回のメイン。supabaseからのデータ取得およびupstashのキャッシュ利用
    • /docs
      • openAPIのドキュメント用のデフォルトPath(自分で作成していない)

Mangum

Lambda × FastAPIの組み合わせを通じて初めて目にしました!

FastAPIはAPIのエンドポイントを複数公開できますが、AWS Lambdaは1つだけしか公開できません。そこで、mangumというライブラリを使うことで、複数のエンドポイントを1つにまとめることができます。

パフォーマンスの面でもメリットがあるそうです。

Magnumを利用する際は、uvicornといったASGIのweb serverを起動するわけではなく、 FastAPIとLambda Proxy Integrationとの間に立ちイベントを変換します。
そのためオーバヘッドもほとんどゼロであり、Lambdaが適切にスケーリングすればパフォーマンスの心配はなく積極的に使っていけそうです。

AWS公式のハンズオンでも使っていました!

loguruを用いたログ出力

pythonのデフォルトでloggingというライブラリがあります。こちらを否定する訳ではなく、簡単に設定できることを求め、今回はloguruにしました。日本語による入門記事もあります。

実装はこちらの記事を参考にしました。
loguru_route_logging.pyの内容をmain.py(アプリケーションサーバ起動時のコマンドで指定するファイル)でmiddlewareに設定します。

loguru_route_logging.py
loguru_route_logging.py
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import time
import datetime
from loguru import logger
import sys
logger.remove()
logger.add(sys.stdout, colorize=True, format="{time:HH:mm:ss} | {level} | {message}")

class LoguruRouteLogging(BaseHTTPMiddleware):
    """各ルートの実行内容をログ出力
    API内で各エンドポイントにアクセスした際に、アクセス内容をログに残す
    Args:
        BaseHTTPMiddleware : リクエスト/レスポンスインタフェースに対するASGIミドルウェアを記述するための抽象クラス.
    """
    
    async def dispatch(self,request: Request, call_next) -> Response:
        before = time.time()         
        time_local = datetime.datetime.fromtimestamp(
            before, tz=datetime.timezone(datetime.timedelta(hours=9))
        )  
        logger.debug(time_local.strftime("%Y/%m/%d %H:%M:%S%Z"))  
        logger.debug(f"{request.method} {request.url}")
        logger.debug("Headers:")
        for name, value in request.headers.items():
            logger.debug(f"\t{name}: {value}")
        if await request.body():
                body = (await request.body()).decode("utf-8")  
        response = await call_next(request)
        return response
main.py(簡略化)
main.py
from fastapi import FastAPI
from src.core.logger.loguru_route_logging import LoguruRouteLogging
from mangum import Mangum
# ・・・・
app = FastAPI()
# ミドルウェアに設定
app.add_middleware(LoguruRouteLogging)

def handler(event: EventAny, context: EventAny):
    asgi_handler = Mangum(app)
    return asgi_handler(event, context)

Secrets Managerとの連携

AWSのLambdaで環境変数を設定できますが、DBやcacheサービスのパスワードは大切に扱いたいので、利用します。。以下の記事を参考に進めました。

値を取得するためのコード

どのキーの値を取得するのか、引数nameで受け取ります。

read_secret_manager.py
import base64

import boto3
from botocore.exceptions import ClientError

def read_value(name: str) -> str:
    """Get secret value From SecretManager."""
    secret_name = name
    region_name = "ap-northeast-1"

    session = boto3.session.Session()
    client = session.client(
        service_name="secretsmanager",
        region_name=region_name,
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name,
        )
        if "SecretString" in get_secret_value_response:
            secret_data = get_secret_value_response["SecretString"]
            return secret_data
        else:
            decoded_binary_secret = base64.b64decode(
                get_secret_value_response["SecretBinary"]
            ) 
    except ClientError as e:
        raise e

Lambda関数に権限付与

Secrets Managerに設定した値がLamabda関数上で読み取れないと、データ取得処理が実行できません。IAMポリシーを追加します。

デフォルトの権限が消えるので、注意!

本題ではないので、簡単に書きます。
CDKでLambda関数のみ定義した場合、デフォルトでcloudwatchに関してロググループ作成・書き込みの権限が付与されます。
ですが、addToRolePolicyという関数を使って、新たにLambda関数に対してポリシーを自分で設定した場合は、デフォルトの権限が消えてしまいます。

Lambda関数が関係するコード
// アプリケーション用のLambda関数
const fastApiappLambda = new lambda.Function(this, "fastapi-sample-container-lambda", {
  code: lambda.Code.fromEcrImage(appEcrRepository, {
    cmd: ["src.main.handler"],
    tag: "latest",
  }),
  functionName: "fastapi-sample-container-lambda",
  runtime: lambda.Runtime.FROM_IMAGE,
  handler: lambda.Handler.FROM_IMAGE,
  timeout: cdk.Duration.seconds(30),
  // layers: [powerToolsLayer],
  environment: {
    ENV: 'dev',
    TITLE: 'FastAPI-MyApp',
    UPSTASH_REDIS_REST_HOST: process.env.UPSTASH_REDIS_REST_HOST!,
    UPSTASH_REDIS_REST_PORT: process.env.UPSTASH_REDIS_REST_PORT!,
    SUPABASE_URL: process.env.SUPABASE_URL!,
  },
  role: fastApiLambdaRole,
});

// シークレットマネージャに関するポリシー
fastApiappLambda.addToRolePolicy(new iam.PolicyStatement({
  effect: iam.Effect.ALLOW,
  actions: ['secretsmanager:GetSecretValue'],
  resources: [
    secretUpstashPassword.secretArn,
    secretSupabaseKey.secretArn
  ],
}));

// cloudwatchに関するポリシー
fastApiappLambda.addToRolePolicy(new iam.PolicyStatement({
  effect: iam.Effect.ALLOW,
  actions: ['logs:CreateLogGroup', 'logs:CreateLogStream', 'logs:PutLogEvents'],
  resources: ['arn:aws:logs:*:*:*'],
}));

API Gatewayとの連携

機密情報は扱ってないですが、皆が自由にアクセスできるのも、、、と思ったのでAPIキーによる認証付きのAPI Gatewayです。プロキシリソースについては、こちらの記事を参考にしました。

API Gateway作成に関係するコード
    const nameRestApi = "Rest API with Lambda";
    const restApi = new apigateway.RestApi(this, nameRestApi, {
      restApiName: `FastAPIGateway`,
      endpointTypes: [apigateway.EndpointType.REGIONAL],
      deployOptions: {
        stageName: 'dev',
      },
    });

    restApi.root.addProxy({
      defaultIntegration: new apigateway.LambdaIntegration(fastApiappLambda),
      anyMethod: true,
      defaultMethodOptions: { apiKeyRequired: true },
    });

    // api key:valueをシークレットにする方法
    const apiKey = restApi.addApiKey("fastapiAppApiKey", {
      apiKeyName: `fastapiApp-apiKey`,
      // value: ""
    }); // APIキーの値は未指定で自動作成

    // 使用量プラン
    const usagePlan = restApi.addUsagePlan("sampleApiUsagePlan", {
      name: 'MyUsagePlan',
      description: 'My API Usage Plan',
      throttle: {
        rateLimit: 50, // スロットリングのレート制限(リクエスト数)
        burstLimit: 100, // スロットリングのバースト制限(一度に許可される最大リクエスト数)
      },
      quota: {
        limit: 1000, // クォータの制限(リクエスト数)
        period: apigateway.Period.WEEK, // クォータの期間(MONTH, WEEK, DAY)
      },
    });
    usagePlan.addApiKey(apiKey);
    usagePlan.addApiStage({
      stage: restApi.deploymentStage
    });
このコードをデプロイすると、以下画像のようになります。

image.png

FastAPI側のアプリケーションでもURLの設定には手を加えていますが、この状態になれば、
URL:https://XXXXXX.execute-api.ap-northeast-1.amazonaws.com/dev/dummyとAPIキーでGETリクエストを送ると、FastAPI上のエンドポイント、/dummyへリクエストが送信できるようになりました。
image.png


DBやcacheの構築の前に・・・

この後データベースとキャッシュを用意しますが、RDSのランニングコストは馬鹿にできないです。。。
私は今年に入ってからsupabaseを使用しています。(詳細は後ほど)
また、キャッシュは最近知ったupstashという外部サービスを使ってみます。(詳細は後ほど)

接続情報の扱い
DBサービスやcacheサービスの接続情報は、github上にPushすることがないようご注意ください。

supabaseによるDB構築

「Firebaseにとって代わるオープンソース」という表現をされています。
今回はデータベースですが、StorageやEdge Function(自作APIの構築)もあります。
直感的な実装でデータ操作が行えて、javascript・Pythonなど、複数言語対応しています。DBとしてはpostgresです。

supabaseのテーブルからデータ取得

早速実装しますが、以下の内容を真似るだけです!

インスタンス定義
read_value関数は、Secret Managerのパートで紹介したコードです。

supabase.py
import os
from supabase import Client, create_client
from src.api.dependencies.read_secret_manager import read_value

def get_supabase_key() -> str:
    """Get supabase key From SecretManager."""
    # Secret managerから値取得
    return read_value(name="supabase-key")

def supabase()-> Client:
    """Define supabase client."""
    url: str = os.environ.get("SUPABASE_URL")
    key: str = get_supabase_key()     
    return create_client(url, key)

データ取得処理

endpoint.py
from src.api.dependencies.supabase import supabase
supabase_client.table("movies").select("*").execute()

alembicによるDBマイグレーション

先に取得処理を書きましたが、テーブル定義と反映はalembicを使います。
SQLAlchemyとセットで用いるDBマイグレーションツールです。
以下の2つの記事を参考にしました。

ざっくりですが、コマンドはinit、revison、upgradeの順に実行します。
revisionコマンド実行時に生成されたファイルに、DBに対する更新内容を記載します。

マイグレーションに関する実装
aabc35790c50_generate_initial_migration.py(更新箇所のみ)
def upgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('movies',
    sa.Column('id', sa.BigInteger(), nullable=False),
    sa.Column('name', sa.Text(), nullable=True),
    sa.Column('description', sa.Text(), nullable=True,unique=True),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index(op.f('ix_name'), 'movies', ['name'], unique=False)
    op.create_index(op.f('ix_id'), 'movies', ['id'], unique=True)
    # ### end Alembic commands ###

alembic.iniというファイルで1つ問題があります。
DBのURLを記載する箇所がありますが。特に考慮しなければ、DBのパスワードを含んだ情報がハードコーディングされてしまいます。。(.gitgnoreに含まれないファイル)
環境変数から値を読み込む方法は、以下を参考にしました。

更新した2ファイルとその説明 環境変数:`SQLALCHEMY_DATABASE_URL`の値を、`SQLALCHEMY_DATABASE_URL`というキーで利用する処理を追加しています。これを`alembic.ini`で読み込む形です。
env.py(一部抜粋)
import os
from dotenv import load_dotenv

load_dotenv()

def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    config.set_section_option("alembic", "SQLALCHEMY_DATABASE_URL", os.environ.get("SQLALCHEMY_DATABASE_URL"))   
alembic.ini(一部抜粋)
[alembic]
sqlalchemy.url = %(SQLALCHEMY_DATABASE_URL)s

作成したmoviesテーブルにデータを10件登録しました。

登録したデータ

app側のgithubリポジトリ、db_ddl_dataというフォルダに登録したデータのcsvファイルを配置してあります。
image.png

upstashによるcache構築

自分できちんと実装する機会がなかったので、やってみました。以下のリンクの内容でサクッと構築できます。

Lambda関数のQuickstartもありました。

インスタンス定義

supabase.py
import redis
from redis import Redis

from src.api.dependencies.read_secret_manager import read_value
from src.core.config import settings

def get_redis_password() -> str:
    """Get Upstash Password From SecretManager."""
    return read_value(name="upstash-redis-rest-password")

def cache() -> Redis:
    """Definition redis instance."""

    return redis.Redis(
        host= settings.upstash_redis_rest_host,
        port= settings.upstash_redis_rest_port,
        password=get_redis_password(),
        ssl=True,
    )

データ取得処理(先ほどのsupabaseと同じ箇所)

  • upstash上に、キー:movies-pythonがあれば、キャッシュからデータを取得
  • upstash上に、キー:movies-pythonが無ければ、supabaseのテーブルからデータを取得後、pickle.dumpsを用いて、データをシリアライズして保存(※)
  • キー:movies-pythonは、30秒経過で破棄
endpoint.py
@router.get("/", response_model=list[MovieResponse])
async def index(
    redis_client: cache = Depends(cache), supabase_client: supabase = Depends(supabase)
) -> list[MovieResponse]:
    """Endpoint about use cache."""

    try:
        if (cached_data := redis_client.get("movies-python")) is not None:
            logger.info("Use cache")
            return pickle.loads(cached_data)
        logger.info("Get Data From DB")
        response = supabase_client.table("movies").select("*").execute()
        movies = response.data
        redis_client.set("movies-python", pickle.dumps(movies))
        redis_client.expire("movies-python", 30)
        return movies
    except Exception as e:
        traceback.print_exc()
        raise APIException(ErrorMessage.FAILED_FETCH_DATA_FROM_MOVIE_TABLE)

※データをシリアライズして保存、参考記事です。

キャッシュ導入してレスポンスまでの時間は短くできた?

短くできました。10件のデータでこれだけ差が発生しているので、もっとデータ量が多いとき、キャッシュのありがたさを実感できるな、と思いました。
(何度か実行してみて、おおよそ4-5倍短い時間でレスポンスが返ってきました。)

DBから取得した場合
取得前にupstashのCLIでキー:movies-pythonがないことを確認します。

確認結果

image.png

image.png

cacheから取得した場合
取得前にupstashのCLIでキー:movies-pythonが存在することを確認します。

確認結果

文字化けしてますが、データはありそうです。。。
image.png

image.png

GitHub Actionsのworkflowは作ったが・・・

上記アプリケーションについて、DockerイメージのBuild・Pushし、Lambda関数を更新するworkflowを作成しましたが、何度か実装経験があり、大した苦労なく完了しました。。。
どうせなら、もうちょっと実務に近い形にしたいと思い、以降の内容に取り組みました。

Ruffの利用(Linterツール)

Rust製のLinterツールです。使ったことがなかったので、この機会に触ってみました。動作を軽く確認して、Github Actionsのworkflowに組み込みます。

An extremely fast Python linter and code formatter, written in Rust.

image.png
確かに早そうです。

また、ルール一覧のページに記載の通り、Ruff supports over 700 lint rules,、だそうです!

適用ルールを決めるために、設定ファイルを用意しよう

プロジェクトのルートディレクトリに、pyproject.tomlを用意しました。ファイル名は制限があるので、詳しくはこちら、ご確認ください。

pyproject.toml
[tool.ruff]
# Enable pycodestyle (`E`) and Pyflakes (`F`) codes by default.
select = ["A", "B", "C", "D", "E", "F", "W", "ANN"]
ignore = [
    "B008", # do not perform function calls in argument defaults
    "C901", # too complex
    "B904", # raise from err
]
fixable = ["I001", "D201"]
target-version = "py312"
select , ignore , fixableの説明はこちら
  • select:括弧内の文字列を設定すると、検知対象です。(画像はdocsより)
    image.png
  • ignore:無視するルールの番号を設定します。
  • fixable::--fixオプションを付けて実行した際に、コード修正を行うルールを設定します。以下画像であれば、2つのルール違反のコードが修正されます。
    image.png

以下のコマンド実行で、現在の階層から再帰的に各ファイルをチェックします。

ruff check .

エラーがでてきました。修正or無視は、皆様の判断にお任せいたします!
image.png

actの利用(ローカルでGithub Actions)

Pushして挙動を確認、エラーがあればそれらを解消・・・と進める場合、workflow runsの数が大きめになることもあります。(場合によってはコストに影響?)
Push(やpull_requestなど)実行時の挙動を、ローカル確認できれば、開発も進めやすそうです。
それを実現できるのが、actです。Dockerさえあれば使用できます。
GitHub Actionsと"仲良くなる"ための練習方法というスライドにありました。

動かしてみよう

作成途中のyamlファイルですが、このファイルで検証してみます。

yamlファイル
name:  FastAPI App CI/CD Pipeline
run-name: ${{ github.actor }} is testing out GitHub Actions 🚀
on: [push]
jobs:
  Explore-GitHub-Actions:
    runs-on: ubuntu-latest
    steps:
      - run: echo "🎉 The job was automatically triggered by a ${{ github.event_name }} event."
      # Ruff Linter Test
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v2
      - uses: chartboost/ruff-action@v1
        with:
          src: ./src
          version: 0.1.6
      # Run FastAPI App
      - uses: actions/checkout@v3
      - name: Install Python
        uses: actions/setup-python@v4
        with:
          python-version: "3.12"
      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt

AWSサービスが関わるworkflowも確認する場合は、認証情報を含めた変数の設定など、もう少し準備が必要になりそうです。

dockerの起動後、lオプション付きで実行すると、workflowを検知してくれます。

sudo service docker status
* Docker is running
act -l
Stage  Job ID                  Job name                Workflow name               Workflow file  Events
0      Explore-GitHub-Actions  Explore-GitHub-Actions  FastAPI App CI/CD Pipeline  sample.yaml    push 

act pushを実行して、pushイベントの挙動を確認します。
2枚目の画像の上の方に、Ruffのエラーが表示され、Job failedとなっています。
image.png
image.png
Ruffのルールを全て満たせたら、成功になる?ということで、pyproject.tomlのignoreに、selectで設定したアルファベットを追加します。

pyproject.toml(一部)
select = ["A", "B", "C", "D", "E", "F", "W", "ANN"]
ignore = [
    "A",
    .....
    "ANN",
    "B008", # do not perform function calls in argument defaults

Job successedと出力されており、成功です!
image.png

実際にGithub ActionsにPushしたときと同じ結果になるの?ということで実際にPushしました。actの実行時と同じ結果でした!(コード状況が少し異なるので、error数差分あり)

Github Actionsの実行結果

Job失敗時
image.png
Job成功時
image.png

stepciの利用(テスト)

こちらも割と最近知った、文字通りCIツールです。こちらの書籍を読んでいた中で、dreddとschemathesisというAPIテストのライブラリを使用してました。他にあるかな~と思っていた所で見つけました。
こちらも挙動を確認してから、Github Actionsのworkflowに組み込みます。

シンプルなハンズオン

FastAPIとのIntegration

以下コマンドを実行すると、コマンドを実行した階層にworkflow.yamlというファイルが生成されます。
FastAPIの各エンドポイントの戻り値の型ヒントを設定しておくと、より正確に仕様を表したファイルが生成されます。

stepci generate http://127.0.0.1:8000/openapi.json
作成したワークフロー
version: "1.0"
name: FastAPI
config:
  http: "http://127.0.0.1:8001"
env:
  domainPort: "http://127.0.0.1:8001"
tests:
  default:
    name: Default
    steps:           
      - id: dummy__get
        name: Dummy
        http:
          url: ${{env.domainPort}}/dummy
          method: GET
          check:
            status: /^20/
            schema:
              $ref: "#/components/schemas/DummyResponse"
components:
  schemas:
    DummyResponse:
      properties:
        status:
          type: string
          title: Status
      type: object
      required:
        - status
      title: DummyResponse
      description: Response model to validate and return when performing dummy endpoint.

workflowの設定が完了したら、実行します。
Workflow passedとなって成功となりました。
image.png

まだ、シンプルなテストしか実装していないので、この後より本格的なテストを作成していきたいと思います。

ふりかえりとgithubリポジトリの情報

FastAPIをLambda関数として作成して知見を増やすのが、当初中心となる予定でしたが、
それ以外の内容の収穫が多かった印象です。(笑)
以下の3つは今後も利用する機会を増やしていきたいと思いました。

  • upstashによるキャッシュを利用したデータ取得
  • actによるローカルGithub Actions実行
  • stepciによるテスト実行

FastAPIについて、規模が大きいアプリケーションを作成していないので、題材が見つかったタイミングで取り組んでみたいと思います。

Lambda関数のイメージ更新について、workflow内ではコマンド1行で終わりましたが、もう少し踏み込んでいました。
lambrollというライブラリがあります。少し触ってみて、Revisionは作成できましたが、Lambda関数が使用するイメージの更新ができず。
自分の操作・理解に問題があると思うので、別の機会に再確認したいと思います。

今回の成果物について、以下リポジトリで確認いただけます。
アプリケーション側

インフラ側

この後やりたいこと

現段階で、修正したい、新たに取り組みたいのは以下の通りです。
日々少しずつ進めていきたいと思います!

  • CDKの実装修正(次のメジャーバージョンアップで廃止となる実装あり)
  • actでAWSサービスを含めたworkflowの検証
  • フォルダ構成の整理やloggingの深堀り
  • lambrollを用いたイメージ更新の正しい手順の確認
  • openAIを利用したAPIの構築
6
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
6