はじめに
CDKを使ってAWSリソースやソースコードをデプロイ出来ますが、DBスキーマの変更が入った場合DBへの反映までは自動でやってくれません。踏み台サーバーなどから手動でマイグレーションを実行するのも面倒だったので、カスタムリソースを使ってデプロイ時にDBマイグレーションも含めて自動実行するようにしてみました。
使用技術
項目 | バージョン |
---|---|
Python | 3.11 |
Postgresql | 15.7 |
Alembic | 1.14.0 |
poetry | 1.8.4 |
AWS CDK | 2.172.0 |
CDKはTypeScriptで、カスタムリソースでトリガーされるLambda関数はPythonで実装しています。
CDKを使うメリット
CloudFormationとの統合による自動ロールバック
CDKはCloudFormationを下層レイヤーで利用しており、マイグレーションをカスタムリソースとして登録できます。これにより、マイグレーション失敗時にはCloudFormationが自動で全体をロールバックする仕組みをそのまま活用可能です。他のツールチェーンやGitHub Actionsで同等の自動ロールバックを行うには、別途スクリプトやフック処理を実装する必要があります。
CIサーバーとのネットワーク要件を簡素化
GitHub ActionsなどのCIサービスからDBに接続してマイグレーションを実行する場合、CIサーバーが稼働する環境からDBが配置されているVPC内への通信経路を確保する必要があります。一方、今回のようにAWS CDKを活用することで、CDKスタック内のカスタムリソースを通じてLambda関数がDBと同じVPC内でマイグレーションを実行できます。その結果、ネットワーク経路確保のための複雑な構成や追加のインフラは不要になり、CIサーバー側でのネットワーク設定を大幅に簡素化できます。
仕組み
CDKが扱うCloudFormationは標準のリソースタイプでは、DBマイグレーションといった任意のアプリケーション固有ロジックを実行できません。そのため、カスタムリソースと専用のLambda関数を組み合わせて、インフラデプロイ時に自動的にDBマイグレーションを実行する仕組みを構築します。
カスタムリソースとProviderの仕組み
AWS CDKでは、CloudFormationに標準で存在しない処理を組み込みたい場合、「カスタムリソース」を利用します。
カスタムリソースは、CloudFormationのデプロイ処理中に任意のLambda関数などを呼び出せる「拡張ポイント」を提供し、インフラ構築と同時に追加のロジックを実行できます。
-
Providerの登録:
まず、特定のLambda関数を「Provider」として定義します。これは「カスタムリソースが作成・更新・削除される時に呼び出されるハンドラー」として機能します。 -
カスタムリソースの定義:
次に、CloudFormationテンプレートに「カスタムリソース」を記述します。
このリソースが「作成(Create)」「更新(Update)」「削除(Delete)」されるたび、対応するライフサイクルイベントがProvider経由でLambda関数に通知され、所定の処理を実行します。
結果として、cdk deploy
時に新規リソース作成や更新が行われると、デプロイ途中でLambdaがトリガーされ、DBマイグレーションなどの任意処理を自動的に実行できるようになります。
CDKで各リソースを宣言しているコードが以下です。
const handler = new lambda_python.PythonFunction(this, 'dbMigrationFunction', {
functionName: 'db-migration-function',
runtime: lambda.Runtime.PYTHON_3_11,
entry: path.join(__dirname, 'lambda-functions', 'db_migration'),
handler: 'lambda_handler',
vpc: vpc,
vpcSubnets: { subnetType: SubnetType.PRIVATE_ISOLATED },
environment: {
DB_SECRET_NAME: dbSecret.secretArn,
},
timeout: cdk.Duration.seconds(300)
});
db.connections.allowDefaultPortFrom(handler);
dbSecret.grantRead(handler);
const provider = new Provider(this, "Provider", {
onEventHandler: handler,
});
new CustomResource(this, "Custom::Migration", {
serviceToken: provider.serviceToken,
properties: { DummyValue: Date.now().toString() }, // ダミーのプロパティを設定してリソースの更新をトリガー
});
CustomResourceのpropertiesに現在時刻のダミー値を持たせて、デプロイのたびにDBマイグレーションの関数が起動するようになっています。(後述しますが、反省点でもあります...)
Lambda関数の実装
DBマイグレーションを実際に行うかどうかはLambda関数の処理内で実行されるAlembicが制御しています。
以下がLambda関数のコードです。
import logging
import os
import subprocess
import json
import boto3
DB_SECRET_NAME = os.environ.get('DB_SECRET_NAME')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info(f"event: {event}")
db_secret = _get_secret_string() if DB_SECRET_NAME else {}
database = db_secret.get("database", "postgresql+psycopg2")
db_user = db_secret.get("username", "postgres")
db_pass = db_secret.get("password", "password")
db_host = db_secret.get("host", "localhost")
db_port = str(db_secret.get("port", 5432))
db_name = db_secret.get("dbname", "db_migration")
logger.info(f"URL: {database}://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}")
os.environ['DATABASE'] = database
os.environ['DB_USER'] = db_user
os.environ['DB_PASSWORD'] = db_pass
os.environ['DB_HOST'] = db_host
os.environ['DB_PORT'] = db_port
os.environ['DB_NAME'] = db_name
# alembic.iniファイルの環境変数を設定
alembic_env = {
**os.environ,
'DATABASE': database,
'DB_USER': db_user,
'DB_PASSWORD': db_pass,
'DB_HOST': db_host,
'DB_PORT': db_port,
'DB_NAME': db_name
}
# カスタムリソースの物理IDを設定
# 物理IDとは、リソースの一意な識別子で、リソースの作成時に指定する
request_type = event.get('RequestType', {})
# もともとCFNから渡されるPhysicalResourceIdを参照
physical_resource_id = event.get('PhysicalResourceId')
if request_type == 'Create':
# Createの場合のみ新しいIDを生成または決定
physical_resource_id = '0c00zzzz-8b4f-4857-a0ee-70783364aad6' # 生成した固有ID
logger.info(f"physical_resource_id: {physical_resource_id}")
logger.info(f"RequestType: {request_type}")
# RequestTypeがCreateまたはUpdateの場合はマイグレーションを実行
if request_type == 'Delete':
return {
'PhysicalResourceId': physical_resource_id,
}
# alembic.iniファイルの環境変数を設定
try:
old_version = _get_current_version(alembic_env)
logger.info(f"Current version: {old_version}")
result = subprocess.run(
["python3", "-m", "alembic", "upgrade", "head"],
env=alembic_env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
logger.info(result.stdout)
if result.returncode != 0:
error_message = f"Migration failed with stderr: {result.stderr}"
logger.error(error_message)
raise Exception(error_message)
new_version = _get_current_version(alembic_env)
logger.info(f"Migration success from {old_version} to {new_version}")
except Exception as error:
logger.info(result.stdout)
logger.error(f"Migration execution failed: {error}")
raise error # エラーを再スローしてロールバックをトリガー
return {
'PhysicalResourceId': physical_resource_id,
}
def _get_secret_string():
try:
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId=DB_SECRET_NAME)
secret_value = response.get("SecretString", "{}")
return json.loads(secret_value)
except Exception as error:
logger.error(f"Failed to get secret string: {error}")
raise error
def _get_current_version(alembic_env):
try:
result = subprocess.run(
["python3", "-m", "alembic", "current"],
env=alembic_env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result.stdout.strip()
except Exception as error:
logger.info(result.stdout)
logger.error(f"Failed to get current version: {error}")
raise error
Lambda関数内でAlembicを実行する場合、マイグレーションファイルやライブラリをLambda関数の実行環境(ランタイム)に含める必要があります。そのため、cdk deploy
の前に以下の2つの処理をnpmスクリプトで自動化します。
- Lambdaハンドラーが格納されたディレクトリに、poetryを用いて
requirements.txt
を出力する - 同ディレクトリ内にAlembic実行に必要なファイル群をコピーする
これらのファイルがLambda関数のコードと同一ディレクトリに存在すれば、CDKが自動的にそれらをLambdaの実行パッケージに含めてデプロイします。
"export-poetry": "poetry export -f requirements.txt --output lib/lambda-functions/db_migration/requirements.txt --without-hashes",
"copy-migration-files": "sh ./scripts/copy-migration-files.sh",
"deploy": "npm run export-poetry && npm run copy-migration-files && cdk deploy"
ソースコード
動作確認
何もテーブルを作成していないところにシンプルなUserテーブルを作成してみます。
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.autogenerate.compare] Detected added table 'users'
INFO [alembic.autogenerate.compare] Detected added index ''ix_users_email'' on '('email',)'
INFO [alembic.autogenerate.compare] Detected added index ''ix_users_id'' on '('id',)'
INFO [alembic.autogenerate.compare] Detected added index ''ix_users_name'' on '('name',)'
Generating /home/akis/dev/dbmigration/migrations/versions/32f8252e73b6_.py ... done
デプロイ用のnpm scriptを実行します。
$ npm run deploy
スキーマもid, name, email のカラムがあります。
さらにusersテーブルのカラム名をusernameに変更してみます。
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
username = Column(String, index=True)
email = Column(String, unique=True, index=True)
$ alembic revision --autogenerate
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.ddl.postgresql] Detected sequence named 'users_id_seq' as owned by integer column 'users(id)', assuming SERIAL and omitting
INFO [alembic.autogenerate.compare] Detected added column 'users.username'
INFO [alembic.autogenerate.compare] Detected removed index 'ix_users_name' on 'users'
INFO [alembic.autogenerate.compare] Detected added index ''ix_users_username'' on '('username',)'
INFO [alembic.autogenerate.compare] Detected removed column 'users.name'
Generating /home/akis/dev/dbmigration/migrations/versions/b4818906d0f9_.py ... done
npm run deploy
実装の反省点
カスタムリソーストリガーの改善
今回は、カスタムリソースのproperties
に毎回異なるダミー値を与えて更新イベントを発生させ、そのたびにLambda関数を起動してマイグレーションを実行する方法を取っています。
しかし、この方法だとDBスキーマに変更がない場合でもLambdaが実行されるため、無駄な実行が発生します。Alembic側で変更がなければ実際にはスキーマ更新は起こりませんが、リソースやコストの観点から、スキーマ変更があった場合のみカスタムリソースが更新されるような仕組みに改良すべきでした。
参考