3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

【AWS】DynamoDBからRDS Aurora (MySQL) へデータ移行する

Last updated at Posted at 2020-05-09

DynamoDBからAurora MySQLへデータ移行することがあったので、その方法を書いていきます
データ移行といっても1つのテーブルに保存されているデータを移行しただけなので、全テーブルのデータ移行ではないことはご了承ください

screenshot 2020-05-09 12.40.26.png

方法

DynamoDBからAurora MySQLへ移行する方法はいくつか考えられると思いますが、手っ取り早くかつ確実に移行するために、使い捨て(?)のPythonでコードを書きました
使い捨てといってもまたデータ移行することがあるかもしれないので使い回せるようにしました

実装

  • python3.8 (pipenv)
  • pymysql
  • boto3

コード

まずはDynamoDBとAurora MySQLのモデルを作成します
標準ライブラリのdataclassを使用すると簡潔かつ綺麗に書くことができます

あとポイントとして、DynamoDBのテーブルから全件取得する際に制限に引っ掛かり一回で取得できない場合を考慮して、scan関数では再帰的に実行するようになっています

公式ドキュメントより

A single Scan operation reads up to the maximum number of items set (if using the Limit parameter) or a maximum of 1 MB of data and then apply any filtering to the results using FilterExpression . If LastEvaluatedKey is present in the response, you need to paginate the result set.

python:models/dynamodb.py
from decimal import Decimal
from dataclasses import dataclass, asdict
from typing import Any, ClassVar, Dict, List

import boto3

from src.utils import convert_to_datetime


@dataclass
class BaseDynamoDB:
    table_name: ClassVar[str]
    hash_key: ClassVar[str]

    @classmethod
    def get_client(cls) -> Any:
        client = boto3.resource("dynamodb")
        return client.Table(cls.table_name)

    @classmethod
    def scan(
        cls, *, recursive: bool = False, exclusive_start_key: Dict = {},
    ) -> List["BaseDynamoDB"]:
        """DynamoDB Tableからアイテムを一部または全件取得する

        全件取得するためにはrecursiveをTrueにする

        Args:
            recursive (bool):
                デフォルトはFalseで一部のデータを取得する
                全件取得するためにはTrueにする
            exclusive_start_key (Dict): スキャンする最初のアイテムのプライマリキー

        Returns:
            List["BaseDynamoDB"]: テーブルモデルのインスタンスリスト
        """

        client = cls.get_client()
        options = {}

        if exclusive_start_key:
            options.update(exclusive_start_key)

        response = client.scan(**options)
        items = list(map(lambda item: cls(**item), response["Items"]))  # type: ignore

        if recursive and "LastEvaluatedKey" in response:
            tmp = cls.scan(
                recursive=True,
                exclusive_start_key=response["LastEvaluatedKey"],
            )
            items.extend(tmp)

        return items


@dataclass
class Qiita(BaseDynamoDB):
    """Qiita用の架空テーブル"""

    table_name: ClassVar[str] = "qiita"
    hash_key: ClassVar[str] = "user_id"

    user_id: str
    created_at: int
    updated_at: int
    memo: str = ""

    def __post_init__(self) -> None:
        for attr in ("updated_at", "created_at"):
            v = getattr(self, attr)
            if isinstance(v, Decimal):
                setattr(self, attr, convert_to_datetime(str(v)))

    def to_dict(self) -> Dict[str, Any]:
        """インスタンスを辞書にして返す

        Returns:
            Dict[str, Any]
        """

        return asdict(self)
models/aurora.py
from datetime import datetime
from dataclasses import asdict, dataclass, field, InitVar
from typing import Any, ClassVar, Dict


@dataclass
class Qiita:
    """Qiita用の架空テーブル"""

    table_name: ClassVar[str] = "qiita"
    primary_key: ClassVar[str] = "user_id"

    user_id: str

    # DynamoDBの日時管理カラム
    created_at: InitVar[datetime]
    updated_at: InitVar[datetime]
    # Aurora MySQLの日時管理カラム
    registration_date: datetime = field(init=False)
    update_date: datetime = field(init=False)

    memo: str = ""
    registration_id: str = "DynamoDB"
    update_id: str = "DynamoDB"

    def __post_init__(
        self, created_at: datetime, updated_at: datetime
    ) -> None:
        self.registration_date = created_at
        self.update_date = updated_at

    def to_dict(self) -> Dict[str, Any]:
        """インスタンスを辞書にして返す

        Returns:
            Dict[str, Any]
        """

        result = asdict(self)
        result["registration_date"] = self.registration_date
        result["update_date"] = self.update_date

        return result
connection.py
import os
from contextlib import contextmanager
from typing import Iterator

import pymysql
from pymysql.connections import Connection

from src.utils import get_logger


logger = get_logger(__name__)


AURORA_DB = os.environ["AURORA_DB"]
AURORA_HOST = os.environ["AURORA_HOST"]
AURORA_PORT = int(os.environ["AURORA_PORT"])
AURORA_USER = os.environ["AURORA_USER"]
AURORA_PASSWORD = os.environ["AURORA_PASSWORD"]


@contextmanager
def connect() -> Iterator[Connection]:
    """Aurora MySQLとコネクション確立

    Returns:
        Iterator[Connection]
    """

    try:
        conn = pymysql.connect(
            db=AURORA_DB,
            host=AURORA_HOST,
            port=AURORA_PORT,
            user=AURORA_USER,
            password=AURORA_PASSWORD,
            charset="utf8mb4",
            cursorclass=pymysql.cursors.DictCursor,
            connect_timeout=120,
        )
    except Exception as err:
        logger.error("Auraroとの接続失敗")
        raise err

    try:
        yield conn
    finally:
        conn.close()

以下がメインスクリプトです
モデル名を変えればいくらでも使い回しがきくようになっています

main.py
from src.db.connection import connect
from src.db.sql import INSERT_QIITA
from src.models.dynamodb import Qiita as DynamoDBQiita
from src.models.mysql import Qiita as MySQLQiita
from src.utils import get_logger

logger = get_logger(__name__)


def main():
    logger.info("START")

    items = DynamoDBQiita.scan(recursive=True)
    logger.info(f"{len(items)}件取得完了")

    params = list(
        map(
            lambda item: MySQLQiita(**item.to_dict()).to_dict(),
            items,
        )
    )

    try:
        with connect() as conn:
            with conn.cursor() as cursor:
                cursor.executemany(INSERT_QIITA, params)
                count = cursor.rowcount
            conn.commit()
    except Exception as err:
        logger.error(err)
        logger.error("INSERT失敗")
    else:
        logger.info(f"{count}件INSERT成功")

    logger.info("END")


if __name__ == "__main__":
    main()

移行

前準備

IAM

DynamoDBのReadOnly Roleを持ったIAM Userを作成して、Access Key IDとSecret Access Keyを取得しておいて下さい
Aurora MySQLは普通のMySQLと同様にusernameとpasswordで認証するので不要です

.env

pipenvにクレデンシャルデータを読み込ませるために.envファイルを用意します

.env
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=
AURORA_DB=
AURORA_HOST=
AURORA_PORT=
AURORA_USER=
AURORA_PASSWORD=

Makefile

ステージングと本番でも同じスクリプトを使用してかつ、簡潔なコマンドを実行するだけでよくなるようにMakefileを書きました
pipenvでpythonスクリプトを実行するにあたり、シークレットキーは.envファイルから読み込むようにしたいので、.envから環境ごとの.env.$(stage)ファイルへシンボリックリンクを張りました

Makefile
stage = stg

.PHONY: dymy
dymy:
	ln -sf .env.$(stage) .env
	pipenv run python -m src.main

余談ですが、pipenvでpythonスクリプトを実行したい場合はPipfileのscriptsセクションに書けばいいのですが、例えば今回のように他の処理もさせたい時にMakefile書くのはオススメです
まぁ、shell scriptでもいいですが個人的にはMakefileの方が好きです

さらばDynamoDB、ようこそAurora MySQL

本番環境で実行する場合はstage引数にprodを指定します

make dymy stage=prod

これでDynamoDBからAurora MySQLへデータ移行完了です
本当に移行できたか確認したい場合は移行元のDynamoDBのテーブルに保管されていた項目数と、移行先のAurora MySQLでSELECT count(*) FROM ~を実行して取得できたカウント数が一致しているか見て下さい

参考

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?