2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

アジアクエストAdvent Calendar 2024

Day 15

CDKでalembicによるDBマイグレーションを自動化する

Last updated at Posted at 2024-12-14

はじめに

CDKを使ってAWSリソースやソースコードをデプロイ出来ますが、DBスキーマの変更が入った場合DBへの反映までは自動でやってくれません。踏み台サーバーなどから手動でマイグレーションを実行するのも面倒だったので、カスタムリソースを使ってデプロイ時にDBマイグレーションも含めて自動実行するようにしてみました。

使用技術

項目 バージョン
Python 3.11
Postgresql 15.7
Alembic 1.14.0
poetry 1.8.4
AWS CDK 2.172.0

CDKはTypeScriptで、カスタムリソースでトリガーされるLambda関数はPythonで実装しています。

CDKを使うメリット

CloudFormationとの統合による自動ロールバック

CDKはCloudFormationを下層レイヤーで利用しており、マイグレーションをカスタムリソースとして登録できます。これにより、マイグレーション失敗時にはCloudFormationが自動で全体をロールバックする仕組みをそのまま活用可能です。他のツールチェーンやGitHub Actionsで同等の自動ロールバックを行うには、別途スクリプトやフック処理を実装する必要があります。

CIサーバーとのネットワーク要件を簡素化

GitHub ActionsなどのCIサービスからDBに接続してマイグレーションを実行する場合、CIサーバーが稼働する環境からDBが配置されているVPC内への通信経路を確保する必要があります。一方、今回のようにAWS CDKを活用することで、CDKスタック内のカスタムリソースを通じてLambda関数がDBと同じVPC内でマイグレーションを実行できます。その結果、ネットワーク経路確保のための複雑な構成や追加のインフラは不要になり、CIサーバー側でのネットワーク設定を大幅に簡素化できます。

仕組み

CDKが扱うCloudFormationは標準のリソースタイプでは、DBマイグレーションといった任意のアプリケーション固有ロジックを実行できません。そのため、カスタムリソースと専用のLambda関数を組み合わせて、インフラデプロイ時に自動的にDBマイグレーションを実行する仕組みを構築します。

カスタムリソースとProviderの仕組み

AWS CDKでは、CloudFormationに標準で存在しない処理を組み込みたい場合、「カスタムリソース」を利用します。
カスタムリソースは、CloudFormationのデプロイ処理中に任意のLambda関数などを呼び出せる「拡張ポイント」を提供し、インフラ構築と同時に追加のロジックを実行できます。

  • Providerの登録:
    まず、特定のLambda関数を「Provider」として定義します。これは「カスタムリソースが作成・更新・削除される時に呼び出されるハンドラー」として機能します。

  • カスタムリソースの定義:
    次に、CloudFormationテンプレートに「カスタムリソース」を記述します。
    このリソースが「作成(Create)」「更新(Update)」「削除(Delete)」されるたび、対応するライフサイクルイベントがProvider経由でLambda関数に通知され、所定の処理を実行します。

結果として、cdk deploy時に新規リソース作成や更新が行われると、デプロイ途中でLambdaがトリガーされ、DBマイグレーションなどの任意処理を自動的に実行できるようになります。

CDKで各リソースを宣言しているコードが以下です。

    const handler = new lambda_python.PythonFunction(this, 'dbMigrationFunction', {
      functionName: 'db-migration-function',
      runtime: lambda.Runtime.PYTHON_3_11,
      entry: path.join(__dirname, 'lambda-functions', 'db_migration'),
      handler: 'lambda_handler',
      vpc: vpc,
      vpcSubnets: { subnetType: SubnetType.PRIVATE_ISOLATED },
      environment: {
        DB_SECRET_NAME: dbSecret.secretArn,
      },
      timeout: cdk.Duration.seconds(300)
    });
    db.connections.allowDefaultPortFrom(handler);
    dbSecret.grantRead(handler);

    const provider = new Provider(this, "Provider", {
      onEventHandler: handler,
    });

    new CustomResource(this, "Custom::Migration", {
      serviceToken: provider.serviceToken,
      properties: { DummyValue: Date.now().toString() }, // ダミーのプロパティを設定してリソースの更新をトリガー
    });

CustomResourceのpropertiesに現在時刻のダミー値を持たせて、デプロイのたびにDBマイグレーションの関数が起動するようになっています。(後述しますが、反省点でもあります...)

Lambda関数の実装

DBマイグレーションを実際に行うかどうかはLambda関数の処理内で実行されるAlembicが制御しています。
以下がLambda関数のコードです。

import logging
import os
import subprocess
import json
import boto3

DB_SECRET_NAME = os.environ.get('DB_SECRET_NAME')

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    logger.info(f"event: {event}")

    db_secret =  _get_secret_string() if DB_SECRET_NAME else {}

    database = db_secret.get("database", "postgresql+psycopg2")
    db_user = db_secret.get("username", "postgres")
    db_pass = db_secret.get("password", "password")
    db_host = db_secret.get("host", "localhost")
    db_port = str(db_secret.get("port", 5432))
    db_name = db_secret.get("dbname", "db_migration")

    logger.info(f"URL: {database}://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}")

    os.environ['DATABASE'] = database
    os.environ['DB_USER'] = db_user
    os.environ['DB_PASSWORD'] = db_pass
    os.environ['DB_HOST'] = db_host
    os.environ['DB_PORT'] = db_port
    os.environ['DB_NAME'] = db_name

    # alembic.iniファイルの環境変数を設定
    alembic_env = {
        **os.environ,
        'DATABASE': database,
        'DB_USER': db_user,
        'DB_PASSWORD': db_pass,
        'DB_HOST': db_host,
        'DB_PORT': db_port,
        'DB_NAME': db_name
    }

    # カスタムリソースの物理IDを設定
    # 物理IDとは、リソースの一意な識別子で、リソースの作成時に指定する
    request_type = event.get('RequestType', {})
        # もともとCFNから渡されるPhysicalResourceIdを参照
    physical_resource_id = event.get('PhysicalResourceId')

    if request_type == 'Create':
        # Createの場合のみ新しいIDを生成または決定
        physical_resource_id = '0c00zzzz-8b4f-4857-a0ee-70783364aad6'  # 生成した固有ID

    logger.info(f"physical_resource_id: {physical_resource_id}")
    logger.info(f"RequestType: {request_type}")

    # RequestTypeがCreateまたはUpdateの場合はマイグレーションを実行
    if request_type == 'Delete':
        return {
            'PhysicalResourceId': physical_resource_id,
        }

    # alembic.iniファイルの環境変数を設定
    try:
        old_version = _get_current_version(alembic_env)
        logger.info(f"Current version: {old_version}")

        result = subprocess.run(
            ["python3", "-m", "alembic", "upgrade", "head"],
            env=alembic_env,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        logger.info(result.stdout)

        if result.returncode != 0:
            error_message = f"Migration failed with stderr: {result.stderr}"
            logger.error(error_message)
            raise Exception(error_message)

        new_version = _get_current_version(alembic_env)
        logger.info(f"Migration success from {old_version} to {new_version}")

    except Exception as error:
        logger.info(result.stdout)
        logger.error(f"Migration execution failed: {error}")
        raise  error # エラーを再スローしてロールバックをトリガー

    return {
        'PhysicalResourceId': physical_resource_id,
    }

def _get_secret_string():

    try:
        client = boto3.client('secretsmanager')
        response = client.get_secret_value(SecretId=DB_SECRET_NAME)
        secret_value = response.get("SecretString", "{}")
        return json.loads(secret_value)
    except Exception as error:
        logger.error(f"Failed to get secret string: {error}")
        raise error

def _get_current_version(alembic_env):
    try:
        result = subprocess.run(
            ["python3", "-m", "alembic", "current"],
            env=alembic_env,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        return result.stdout.strip()
    except Exception as error:
        logger.info(result.stdout)
        logger.error(f"Failed to get current version: {error}")
        raise error

Lambda関数内でAlembicを実行する場合、マイグレーションファイルやライブラリをLambda関数の実行環境(ランタイム)に含める必要があります。そのため、cdk deploy の前に以下の2つの処理をnpmスクリプトで自動化します。

  1. Lambdaハンドラーが格納されたディレクトリに、poetryを用いてrequirements.txtを出力する
  2. 同ディレクトリ内にAlembic実行に必要なファイル群をコピーする

これらのファイルがLambda関数のコードと同一ディレクトリに存在すれば、CDKが自動的にそれらをLambdaの実行パッケージに含めてデプロイします。

"export-poetry": "poetry export -f requirements.txt --output lib/lambda-functions/db_migration/requirements.txt --without-hashes",
"copy-migration-files": "sh ./scripts/copy-migration-files.sh",
"deploy": "npm run export-poetry && npm run copy-migration-files && cdk deploy"

ソースコード

動作確認

何もテーブルを作成していないところにシンプルなUserテーブルを作成してみます。

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    email = Column(String, unique=True, index=True)
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.autogenerate.compare] Detected added table 'users'
INFO  [alembic.autogenerate.compare] Detected added index ''ix_users_email'' on '('email',)'
INFO  [alembic.autogenerate.compare] Detected added index ''ix_users_id'' on '('id',)'
INFO  [alembic.autogenerate.compare] Detected added index ''ix_users_name'' on '('name',)'
  Generating /home/akis/dev/dbmigration/migrations/versions/32f8252e73b6_.py ...  done

デプロイ用のnpm scriptを実行します。

$ npm run deploy

usersテーブルが作成されました。
image.png

スキーマもid, name, email のカラムがあります。
image.png

さらにusersテーブルのカラム名をusernameに変更してみます。

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, index=True)
    email = Column(String, unique=True, index=True)
$ alembic revision --autogenerate
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.ddl.postgresql] Detected sequence named 'users_id_seq' as owned by integer column 'users(id)', assuming SERIAL and omitting
INFO  [alembic.autogenerate.compare] Detected added column 'users.username'
INFO  [alembic.autogenerate.compare] Detected removed index 'ix_users_name' on 'users'
INFO  [alembic.autogenerate.compare] Detected added index ''ix_users_username'' on '('username',)'
INFO  [alembic.autogenerate.compare] Detected removed column 'users.name'
  Generating /home/akis/dev/dbmigration/migrations/versions/b4818906d0f9_.py ...  done
npm run deploy

カラム名がnameがusernameに変更されています。
image.png

実装の反省点

カスタムリソーストリガーの改善
今回は、カスタムリソースのpropertiesに毎回異なるダミー値を与えて更新イベントを発生させ、そのたびにLambda関数を起動してマイグレーションを実行する方法を取っています。
しかし、この方法だとDBスキーマに変更がない場合でもLambdaが実行されるため、無駄な実行が発生します。Alembic側で変更がなければ実際にはスキーマ更新は起こりませんが、リソースやコストの観点から、スキーマ変更があった場合のみカスタムリソースが更新されるような仕組みに改良すべきでした。

参考

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?