DynamoDBからAurora MySQLへデータ移行することがあったので、その方法を書いていきます
データ移行といっても1つのテーブルに保存されているデータを移行しただけなので、全テーブルのデータ移行ではないことはご了承ください
方法
DynamoDBからAurora MySQLへ移行する方法はいくつか考えられると思いますが、手っ取り早くかつ確実に移行するために、使い捨て(?)のPythonでコードを書きました
使い捨てといってもまたデータ移行することがあるかもしれないので使い回せるようにしました
実装
- python3.8 (pipenv)
- pymysql
- boto3
コード
まずはDynamoDBとAurora MySQLのモデルを作成します
標準ライブラリのdataclassを使用すると簡潔かつ綺麗に書くことができます
あとポイントとして、DynamoDBのテーブルから全件取得する際に制限に引っ掛かり一回で取得できない場合を考慮して、scan関数では再帰的に実行するようになっています
公式ドキュメントより
A single Scan operation reads up to the maximum number of items set (if using the Limit parameter) or a maximum of 1 MB of data and then apply any filtering to the results using FilterExpression . If LastEvaluatedKey is present in the response, you need to paginate the result set.
from decimal import Decimal
from dataclasses import dataclass, asdict
from typing import Any, ClassVar, Dict, List
import boto3
from src.utils import convert_to_datetime
@dataclass
class BaseDynamoDB:
table_name: ClassVar[str]
hash_key: ClassVar[str]
@classmethod
def get_client(cls) -> Any:
client = boto3.resource("dynamodb")
return client.Table(cls.table_name)
@classmethod
def scan(
cls, *, recursive: bool = False, exclusive_start_key: Dict = {},
) -> List["BaseDynamoDB"]:
"""DynamoDB Tableからアイテムを一部または全件取得する
全件取得するためにはrecursiveをTrueにする
Args:
recursive (bool):
デフォルトはFalseで一部のデータを取得する
全件取得するためにはTrueにする
exclusive_start_key (Dict): スキャンする最初のアイテムのプライマリキー
Returns:
List["BaseDynamoDB"]: テーブルモデルのインスタンスリスト
"""
client = cls.get_client()
options = {}
if exclusive_start_key:
options.update(exclusive_start_key)
response = client.scan(**options)
items = list(map(lambda item: cls(**item), response["Items"])) # type: ignore
if recursive and "LastEvaluatedKey" in response:
tmp = cls.scan(
recursive=True,
exclusive_start_key=response["LastEvaluatedKey"],
)
items.extend(tmp)
return items
@dataclass
class Qiita(BaseDynamoDB):
"""Qiita用の架空テーブル"""
table_name: ClassVar[str] = "qiita"
hash_key: ClassVar[str] = "user_id"
user_id: str
created_at: int
updated_at: int
memo: str = ""
def __post_init__(self) -> None:
for attr in ("updated_at", "created_at"):
v = getattr(self, attr)
if isinstance(v, Decimal):
setattr(self, attr, convert_to_datetime(str(v)))
def to_dict(self) -> Dict[str, Any]:
"""インスタンスを辞書にして返す
Returns:
Dict[str, Any]
"""
return asdict(self)
from datetime import datetime
from dataclasses import asdict, dataclass, field, InitVar
from typing import Any, ClassVar, Dict
@dataclass
class Qiita:
"""Qiita用の架空テーブル"""
table_name: ClassVar[str] = "qiita"
primary_key: ClassVar[str] = "user_id"
user_id: str
# DynamoDBの日時管理カラム
created_at: InitVar[datetime]
updated_at: InitVar[datetime]
# Aurora MySQLの日時管理カラム
registration_date: datetime = field(init=False)
update_date: datetime = field(init=False)
memo: str = ""
registration_id: str = "DynamoDB"
update_id: str = "DynamoDB"
def __post_init__(
self, created_at: datetime, updated_at: datetime
) -> None:
self.registration_date = created_at
self.update_date = updated_at
def to_dict(self) -> Dict[str, Any]:
"""インスタンスを辞書にして返す
Returns:
Dict[str, Any]
"""
result = asdict(self)
result["registration_date"] = self.registration_date
result["update_date"] = self.update_date
return result
import os
from contextlib import contextmanager
from typing import Iterator
import pymysql
from pymysql.connections import Connection
from src.utils import get_logger
logger = get_logger(__name__)
AURORA_DB = os.environ["AURORA_DB"]
AURORA_HOST = os.environ["AURORA_HOST"]
AURORA_PORT = int(os.environ["AURORA_PORT"])
AURORA_USER = os.environ["AURORA_USER"]
AURORA_PASSWORD = os.environ["AURORA_PASSWORD"]
@contextmanager
def connect() -> Iterator[Connection]:
"""Aurora MySQLとコネクション確立
Returns:
Iterator[Connection]
"""
try:
conn = pymysql.connect(
db=AURORA_DB,
host=AURORA_HOST,
port=AURORA_PORT,
user=AURORA_USER,
password=AURORA_PASSWORD,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=120,
)
except Exception as err:
logger.error("Auraroとの接続失敗")
raise err
try:
yield conn
finally:
conn.close()
以下がメインスクリプトです
モデル名を変えればいくらでも使い回しがきくようになっています
from src.db.connection import connect
from src.db.sql import INSERT_QIITA
from src.models.dynamodb import Qiita as DynamoDBQiita
from src.models.mysql import Qiita as MySQLQiita
from src.utils import get_logger
logger = get_logger(__name__)
def main():
logger.info("START")
items = DynamoDBQiita.scan(recursive=True)
logger.info(f"{len(items)}件取得完了")
params = list(
map(
lambda item: MySQLQiita(**item.to_dict()).to_dict(),
items,
)
)
try:
with connect() as conn:
with conn.cursor() as cursor:
cursor.executemany(INSERT_QIITA, params)
count = cursor.rowcount
conn.commit()
except Exception as err:
logger.error(err)
logger.error("INSERT失敗")
else:
logger.info(f"{count}件INSERT成功")
logger.info("END")
if __name__ == "__main__":
main()
移行
前準備
IAM
DynamoDBのReadOnly Roleを持ったIAM Userを作成して、Access Key IDとSecret Access Keyを取得しておいて下さい
Aurora MySQLは普通のMySQLと同様にusernameとpasswordで認証するので不要です
.env
pipenvにクレデンシャルデータを読み込ませるために.envファイルを用意します
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=
AURORA_DB=
AURORA_HOST=
AURORA_PORT=
AURORA_USER=
AURORA_PASSWORD=
Makefile
ステージングと本番でも同じスクリプトを使用してかつ、簡潔なコマンドを実行するだけでよくなるようにMakefileを書きました
pipenvでpythonスクリプトを実行するにあたり、シークレットキーは.envファイルから読み込むようにしたいので、.envから環境ごとの.env.$(stage)ファイルへシンボリックリンクを張りました
stage = stg
.PHONY: dymy
dymy:
ln -sf .env.$(stage) .env
pipenv run python -m src.main
余談ですが、pipenvでpythonスクリプトを実行したい場合はPipfileのscriptsセクションに書けばいいのですが、例えば今回のように他の処理もさせたい時にMakefile書くのはオススメです
まぁ、shell scriptでもいいですが個人的にはMakefileの方が好きです
さらばDynamoDB、ようこそAurora MySQL
本番環境で実行する場合はstage引数にprodを指定します
make dymy stage=prod
これでDynamoDBからAurora MySQLへデータ移行完了です
本当に移行できたか確認したい場合は移行元のDynamoDBのテーブルに保管されていた項目数と、移行先のAurora MySQLでSELECT count(*) FROM ~
を実行して取得できたカウント数が一致しているか見て下さい