0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ECS + EventBridge + DynamoDBでAurora MySQLの月次データ削除バッチを自動化する

0
Last updated at Posted at 2024-09-15

Aurora MySQLの不要データを削除するにあたり、データ数が多すぎて手動での削除は効率が悪いので、自動化を検討しました。今回は月単位のデータを日毎にバッチ処理で自動削除する仕組みを構築しました。

当初はEventBridge + Lambdaでバッチ処理を行うつもりだったのですが、よく考えたらLambdaの最大実行時間(15分)以内に処理が完了しないことに気がついたので、Lambdaの使用を諦めました。

そこで、ECS on FargateのタスクをEventBridgeのスケジューラーで毎日定期的に起動する構成に変更しました。

また、どのデータを削除したかの進捗管理のために、無料枠でいけそうなDynamoDBを使ってみることにしました。

構築手順

1. DynamoDBテーブルの作成

削除進捗を管理するためにDynamoDBを使用します。以下の設定でテーブルを作成します(他はデフォルトのまま)。

  • テーブル名: deletion_progress
  • パーティションキー: id (文字列)

スクリーンショット 2024-09-15 8.37.17.png

2. DynamoDB操作用のIAMポリシー作成

ECSタスクからDynamoDBへのデータ取得・保存・更新が必要なため、DynamoDB操作用のIAMポリシーを作成し、ECSタスク実行ロール(ecsTaskExecutionRole)にアタッチします。

以下のポリシーを作成し、ecsTaskExecutionRoleにアタッチします。Resourceには先ほど作成したDynamoDBテーブルのARNを指定します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem"
            ],
            "Resource": "arn:aws:dynamodb:your-region:your-account-id:table/your-table-name"
        }
    ]
}

3. ECS用セキュリティグループの作成

ECSタスクからAuroraとDynamoDBにアクセスできるように、セキュリティグループを設定します。Auroraにはポート3306、DynamoDBにはポート443を許可します。

以下は参考のTerraformコードです。

resource "aws_security_group_rule" "ecs_sg_egress_rds" {
  type                     = "egress"
  from_port                = 3306
  to_port                  = 3306
  protocol                 = "tcp"
  security_group_id        = aws_security_group.ecs_sg.id
  source_security_group_id = aws_security_group.rds_sg.id
}

resource "aws_security_group_rule" "ecs_sg_egress_https" {
  type              = "egress"
  from_port         = 443
  to_port           = 443
  protocol          = "tcp"
  security_group_id = aws_security_group.ecs_sg.id
  cidr_blocks       = ["0.0.0.0/0"]
}

4. ECRリポジトリの作成

ECSタスク用のDockerイメージを保管するため、ECRリポジトリdaily-deletionを作成します。

スクリーンショット 2024-09-15 8.59.14.png

5. アプリケーションコードの作成(Python)

Aurora MySQLのoperationsテーブルから、2022年4月~2024年5月のレコードを月単位で削除するスクリプトです。DynamoDBで削除進行状況(last_year, last_month)を保存し、次回の処理に利用します。

main.py
import os
import decimal
import pymysql
import boto3
from datetime import datetime, timedelta
from logging import INFO, Formatter, StreamHandler, getLogger

RDS_HOST = os.getenv('RDS_HOST')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_NAME = os.getenv('DB_NAME')
DYNAMODB_TABLE = os.getenv('DYNAMODB_TABLE')


dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE)

logger = getLogger()
logger.setLevel(INFO)

if not logger.hasHandlers():
    handler = StreamHandler()
    logger.addHandler(handler)

    formatter = Formatter(
        '[%(levelname)s]\t%(asctime)s\t%(message)s\n',
        '%Y-%m-%d %H:%M:%S'
    )

    for handler in logger.handlers:
        handler.setFormatter(formatter)


def connect_to_db():  # MySQLに接続する関数
    try:
        logger.info("Attempting to connect to the database with retries")
        connection = pymysql.connect(host=RDS_HOST,
                                     port=3306,
                                     user=DB_USER,
                                     password=DB_PASSWORD,
                                     db=DB_NAME,
                                     cursorclass=pymysql.cursors.DictCursor)
        logger.info("Database connection established successfully")
        return connection
    except pymysql.MySQLError as e:
        logger.error(f"Database connection failed: {e}")
        raise


def get_deletion_progress():  # 進行状況をDynamoDBから取得
    response = table.get_item(Key={'id': 'progress'})
    if 'Item' in response:
        last_year = int(response['Item']['last_year'])
        last_month = int(response['Item']['last_month'])

        logger.info(
            f"Retrieved from DynamoDB - last_year: {last_year} (type: {type(last_year)}), last_month: {last_month} (type: {type(last_month)})")

        return last_year, last_month
    else:
        # 初回の場合は2022年4月を設定(進行状況がない場合はまだ何も削除されていない)
        return 2022, 3  # 2022年4月を最初に削除するため、初回は3月として扱う


def update_deletion_progress(year, month):  # 進行状況をDynamoDBに保存
    table.put_item(Item={
        'id': 'progress',
        'last_year': year,
        'last_month': month
    })


def delete_records(year, month):  # 月単位でレコードを削除する関数
    try:
        logger.info(
            f"Starting record deletion for year: {year}, month: {month}")

        connection = connect_to_db()
        with connection.cursor() as cursor:
            start_date = f'{year}-{month:02d}-01'
            logger.info(f"Start Date generated: {start_date}")
            end_date = (datetime(year, month, 1) + timedelta(days=32)
                        ).replace(day=1).strftime('%Y-%m-%d')
            logger.info(
                f"Deleting records for the period {start_date} to {end_date}")

            delete_query = f"""
                DELETE {DB_NAME}.operations
                FROM {DB_NAME}.operations
                WHERE operations.created_at >= %s
                AND operations.created_at < %s;
            """

            cursor.execute(
                delete_query, (start_date, end_date))

            connection.commit()
            logger.info(f"Deleted records for {year}-{month:02d}")
    except Exception as e:
        logger.error(f"Error deleting records: {e}")
    finally:
        if connection:
            connection.close()


def main():
    last_year, last_month = get_deletion_progress()
    logger.info(
        f"get_deletion_progress: {last_year}, month: {last_month}")

    if last_month == 12:
        next_year = last_year + 1
        next_month = 1
    else:
        next_year = last_year
        next_month = last_month + 1

    if next_year > 2024 or (next_year == 2024 and next_month > 5):
        logger.info(
            "Deletion process has completed. No more records to delete.")
        return

    try:
        delete_records(next_year, next_month)

        update_deletion_progress(next_year, next_month)
        logger.info(
            f"Deletion progress updated to {next_year}-{next_month:02d}")

    except Exception as e:
        logger.error(f"Error during deletion process: {e}")
        import traceback
        logger.error(traceback.format_exc())


if __name__ == "__main__":
    main()

セキュリティのため、MySQL接続情報は環境変数から読み込みます。

connection = pymysql.connect(host=RDS_HOST,
                             port=3306,
                             user=DB_USER,
                             password=DB_PASSWORD,
                             db=DB_NAME,

また、DynamoDBから取得したlast_yearlast_monthがDecimal型になっているので、Pythonで数値計算できるようにIntに変換します。

last_year = int(response['Item']['last_year'])
last_month = int(response['Item']['last_month'])

6. Dockerイメージのビルドおよびプッシュ

ECSタスクで参照するDockerイメージをローカル環境でビルドし、先ほど作成したECRリポジトリにプッシュします。

まず、main.pyと同階層にDockerfilerequiments.txtを作成します。

FROM python:3.12-slim

WORKDIR /usr/src/app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "./main.py"]
requiments.txt
boto3
pymysql

ローカル環境でビルドします。buildxをつかっているのは、自環境がMac M2(Arm)のためです。

docker buildx build --platform linux/amd64 -f Dockerfile -t <account_id>.dkr.ecr.ap-northeast-1.amazonaws.com/daily-deletion:latest .

ECR認証します。

aws ecr get-login-password --region ap-northeast-1 | docker login --username AWS --password-stdin <accound_id>.dkr.ecr.ap-northeast-1.amazonaws.com

ビルドしたイメージをプッシュします。

docker push <accound_id>.dkr.ecr.ap-northeast-1.amazonaws.com/daily-deletion:latest

プッシュしたらECRリポジトリを確認します。
スクリーンショット 2024-09-15 9.26.14.png

7. ECSタスク定義の作成

JSONでタスク定義を作成します。
アプリケーションで使用する環境変数はenvironmentに定義します。
また、データ量によってはmemorycpuをもっと小さい値にできます(検証してみないとわからないですが)。
あと、"awslogs-create-group": "true”がないと初回のタスク実行でCloudWatchロググループの作成に失敗してエラーが出るので、注意が必要です。

{
  "family": "daily-deletion-task",
  "networkMode": "awsvpc",
  "containerDefinitions": [
    {
      "name": "daily-deletion-container",
      "image": "<account_id>.dkr.ecr.ap-northeast-1.amazonaws.com/daily-deletion:latest",
      "essential": true,
      "memory": 512,
      "cpu": 256,
      "environment": [
        { "name": "RDS_HOST", "value": "<cluster_endpoint_name>" },
        { "name": "DB_USER", "value": "<mysql_user_name>" },
        { "name": "DB_PASSWORD", "value": "<mysql_user_password>" },
        { "name": "DB_NAME", "value": "<db_schema_name>" },
        { "name": "DYNAMODB_TABLE", "value": "deletion_progress" }
      ],
      "logConfiguration": {
        "logDriver": "awslogs",
        "options": {
          "awslogs-group": "/ecs/daily-deletion",
          "awslogs-create-group": "true”,
          "awslogs-region": "ap-northeast-1",
          "awslogs-stream-prefix": "ecs"
        }
      }
    }
  ],
  "requiresCompatibilities": ["FARGATE"],
  "cpu": "256",
  "memory": "512",
  "taskRoleArn": "arn:aws:iam::<account_name>:role/ecsTaskRole",
  "executionRoleArn": "arn:aws:iam::<account_name>:role/ecsTaskExecutionRole"
}

スクリーンショット 2024-09-15 9.43.30.png

8. EventBridgeスケジュールの作成

これまでECSタスクを立ち上げる前にECSサービスを構築していたので、今回もそうするものだと思いこんでいたのですが、バッチ処理をする際はEventBridgeから直接ECSタスクを立ち上げるということを初めて知りました。

「EventBridge > スケジュール」からスケジュールパターン(今回は毎日AM3:00)、タスク定義、サブネット、セキュリティグループなどを選択します。

スクリーンショット 2024-09-16 7.24.37.png

スクリーンショット 2024-09-16 7.28.55.png

動作確認

CloudWatch Logsでレコードが正しく削除されたことを確認しました。

スクリーンショット 2024-09-16 7.39.01.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?