9
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

LAPRASAdvent Calendar 2019

Day 2

【開発環境】本番DBに近いデータセットの作り方

Last updated at Posted at 2019-12-01

概要

Webサービスを作っていると、手元の開発環境でも本番環境のデータベースに近いデータが欲しくなりませんか?なりますよね。
本番DBで実行したデータの整合性を保つためのパッチを自分のローカルの環境では当て忘れていた…等で手元のデータが期待していない形になっていることもあるでしょう。
また、シードデータを用意するのも一つの手ですが、シードデータのメンテナンスに意外とコストがかかったり、データの特性が本番データと違うことが影響してリリースしてみたらパフォーマンスに問題が…ということも起きます。

ということで、本番DBから部分的にデータを取り出してローカルで使うということをしてみたので、そのスクリプトと共にやったことの要約を書いていきます。
スクリプトはPythonで書いているので、Python読めない方はごめんなさい…

超大前提として、本番データを何も加工せずにローカルにもってくると法律的にヤバいサービスはたくさんあると思うので、適切なマスキング処理を必ず入れてください。
マスキングの方法はデータの特性にも依るのでこの記事では詳細に触れません。

全体の流れ

  1. 本番DBのテーブルの部分集合となるテーブルを作成する
  2. 部分集合のテーブルのdumpを作成する
  3. ローカルでそのdumpをimportする

注意)
本番DBと書いていますが、当然本番稼働しているDBサーバーには影響のない場所で行ってください。
スナップショットから復元した別のサーバー上で作業することを想定しています。
もし本番環境でやらかしてしまった場合には来年の 本番環境でやらかしちゃった人 Advent Calendar にぜひエントリーしてください!

1. 本番DBのテーブルの部分集合となるテーブルを作成する

ここが一番考えることが多い部分です。
分かりやすいように例を出して説明します。本に関する管理サービスを考えた時に books, users, favs の3テーブルがあり、favsからbooksとusersにForeign Keyを張っているとします。
部分集合となるテーブルの作成と言っているのは、本番DBにbooks: 100万, users: 100万, favs: 500万 のレコードあった時にローカルで使いたいデータ books: 1万, users: 1万, favs: 5万 を持つテーブルを作成することを指しています。
部分集合のテーブルを便宜的にそれぞれ books_small, users_small, favs_small と呼ぶことにします。

CREATE TABLE books_small SELECT * FROM books LIMIT 10000;
CREATE TABLE users_small SELECT * FROM users LIMIT 10000;
CREATE TABLE favs_small SELECT * FROM favs LIMIT 50000;

とやりたくなりますが、favs_small には books_small と users_small に対する Foreign Key の制約があるためこれだけでは成功しません。(favs_small には users_small に含まれないが users には含まれるユーザが入っている可能性があるため)

さらに今回の例ではテーブルの依存関係が単純なので book, users の後に favs を実行すれば依存先がすでに作られていて問題とならないことが明らかですが、テーブル間の依存関係の解決を100以上のテーブルに対して人間が行うのは現実的ではありません。

つまり課題としては下の2つがあり、これを満たすようにして部分集合のテーブルを作成していきます。
i. 部分集合のテーブル作成時に Foreign Key の依存先から作りたい
ii. 部分集合のテーブルにデータを入れる時に Foreign Key の制約を守りたい

i. 部分集合のテーブル作成時に Foreign Key の依存先から作りたい

これを達成するためには依存されている側からテーブルを作っていく必要があります。
どの順にテーブルを作っていけばよいのかを返してくれる get_table_list() の関数を定義してみます。

from typing import List

import MySQLdb.cursors

global db
db = MySQLdb.connect()

def get_table_list() -> List[str]:
    """
    データの依存関係も意識しながらcreate tableやinsertしていく順番にテーブル名を返す
    """
    global db

    def _get_list_of_referring_tables(table_name) -> List[str]:
        """
        `show create table` をして Foreign Key を張っているテーブル名の一覧を取得
        依存先のテーブルの依存先も見に行くために再帰的に処理する
        """
        tables = [table_name]
        cur.execute(f'show create table {table_name}')
        statement = cur.fetchone()['Create Table']
        references = re.findall(r'REFERENCES `(\w+)`', statement)
        if references:
            for referring_table in references:
                tables = _get_list_of_referring_tables(referring_table) + tables  # 依存しているのが前
        return tables

    # `show tables` して取得したテーブル一覧を table_list に入れる。(依存関係は気にしない)
    cur = db.cursor()
    cur.execute("show tables")
    rows = cur.fetchall()
    table_list = []
    for row in rows:
        table_list.append(row['Tables_in_[database]'])

    # 依存されているテーブルが必ず前に来るように、順番に意味をもたせたテーブル一覧 (テーブル名の重複許可)
    table_list_order_by_referred = []  
    for table_name in table_list:
        table_list_order_by_referred += _get_list_of_referring_tables(table_name)

    # table_list_order_by_referred には重複してテーブル名が入っているので重複を取り除く
    # 前から順番に重複を消していくことで依存されているものが先に来る
    unique_table_list_order_by_referred = []
    for table_name in table_list_order_by_referred:
        if table_name not in unique_table_list_order_by_referred:
            unique_table_list_order_by_referred.append(table_name)
    return unique_table_list_order_by_referred

これで get_table_list() で得られるテーブル順に

CREATE TABLE books_small SELECT * FROM books LIMIT 10000;

的なことをしていけばテーブル間の依存関係は解決です。

ii. 部分集合のテーブルにデータを入れる時に Foreign Key の制約を守りたい

続いて、データを入れる時の依存関係の解決方法です。
先程も書きましたが何も考えずに

CREATE TABLE books_small SELECT * FROM books LIMIT 10000;
CREATE TABLE users_small SELECT * FROM users LIMIT 10000;
CREATE TABLE favs_small SELECT * FROM favs LIMIT 50000;

をすると Cannot add or update a child row: a foreign key constraint fails なエラーで怒られます。
favs_small には books_small と users_small に入っている book と user だけのレコードが入ってほしい訳ですね。

選択肢としては以下の2つかと思います。

  • create table するときの insert 部分で制限する
CREATE TABLE favs_small 
SELECT * 
FROM favs 
WHERE book_id IN (SELECT id FROM books_small) 
AND user_id IN (SELECT id FROM users_small)
LIMIT 50000;
  • Foreign Key のチェックを外して入れてから、不要なデータを削除してチェックを復活させる
SET FOREIGN_KEY_CHECKS = 0
CREATE TABLE favs_small SELECT * FROM favs LIMIT 50000;
DELETE FROM favs_small WHERE book_id NOT IN (SELECT id FROM books_small);
DELETE FROM favs_small WHERE user_id NOT IN (SELECT id FROM users_small);
SET FOREIGN_KEY_CHECKS = 1

どちらでも良いのですがSQL文を組み立てるコストが後者の方が低く感じたので今回はそちらのアプローチにしました。

ちなみに

DELETE FROM favs_small WHERE book_id NOT IN (SELECT id FROM books_small);

は少なくともMySQLで実行する場合、DELETE ... NOT IN ... の実行計画の組み立てが下手なのかものすごく時間がかかるので

SELECT id FROM favs_small WHERE book_id NOT IN (SELECT id FROM books_small);
DELETE FROM favs_small WHERE id IN ([上で取得したidのリスト]);

の2つのクエリに分解して実行すると早くて嬉しいです。

ということでそれをPythonで実現するとこんな感じのコードになります。


# 各テーブルのレコード数上限をこんな感じで定義しておく
TABLE_RECORD_LIMIT = {
  'users': 10000,
  'books': 10000,
  'favs': 50000,
}


def create_small_table():
    """
    [table_name]_small というテーブルを作って、そこにdumpする対象のデータを入れていく。
    """
    global db

    table_list = get_table_list()
    cur = db.cursor()
    for table_name in table_list:
        small_table_name = get_small_table_name(table_name)
        cur.execute(f'SHOW CREATE TABLE {table_name}')
        table_meta_data = cur.fetchone()['Create Table']
        # `table_name` が依存しているテーブルの名前の一覧を取得
        references = re.findall(r'REFERENCES `(\w+)`', table_meta_data)

        limit_statement = ''
        if table_name in TABLE_RECORD_LIMIT:
            limit_statement = f'LIMIT {TABLE_RECORD_LIMIT[table_name]}'

        cur.execute('SET FOREIGN_KEY_CHECKS = 0')
        cur.execute(f'CREATE TABLE {small_table_name} SELECT * FROM {table_name} {limit_statement}')
        for parent_table in references:
            small_parent_table = get_small_table_name(parent_table)
            reference_column_name = get_reference_column_name(table_meta_data, parent_table)
            cur.execute(f"""
            SELECT id 
            FROM {small_table_name} 
            WHERE {reference_column_name} NOT IN (SELECT id FROM {small_parent_table})
            """)
            delete_id_list = ','.join([str(row['id']) for row in cur.fetchall()])
            if delete_id_list:
                cur.execute(f'DELETE FROM {small_table_name} WHERE id IN ({delete_id_list})')
        cur.execute('SET FOREIGN_KEY_CHECKS = 1')


def get_small_table_name(original_table_name):
    """
    好きなように実装してもらって良いですが
    元のテーブル名よりも長いものを返すとテーブル名の最大長に違反する可能性があるので注意です
    """
    return original_table_name + '_small'
    # return original_table_name[:-2] + '_s'  # 私はこちらで実装しました


def get_reference_column_name(table_meta_data, referring_table_name):
    """
    `SHOW CREATE TABLE` で取得したテーブルのメタデータ(table_meta_data)から、
    参照先のテーブル(referring_table_name)を指しているカラム名を取得する
    """
    return re.findall(r'\(`(\w+)`\) REFERENCES `' + referring_table_name, table_meta_data)[0]

注意点としては、favs の取得件数上限を50000と一番最初に定義していますが、50000個取ってきた後に制約違反のレコードをdeleteしているので実際に残るのは50000件以下になります。
厳密に50000件取りたい場合には、先ほど説明した2つある選択肢のうち1つめの手法をとると実現できます。

2.部分集合のテーブルのdumpを作成する

ここまででForeign Keyの整合性が取れた部分集合なテーブルができたので、あとは何も考えずにそのテーブルのダンプを取るだけです。
mysqldump を使いたい人は show tables した結果に対して部分集合なテーブルのpost-fixである _small とかでgrepして

$ mysqldump -u user -p [database] books_small users_small favs_small hoge_small .... > hoge.dump

なコマンドを組み立てればよいです。

dumpするところも自分で書きたい人は頑張りましょう、こんな感じで書けます。
マスキングする処理はココで入れると便利かと思います。

from functools import lru_cache

def create_small_db_dump():
    global db

    cur = db.cursor()
    table_list = get_table_list()
    BATCH_SIZE = 30000
    for table_name in table_list:
        small_table_name = get_small_table_name(table_name)
        offset = 0
        while True:
            cur.execute(f'SELECT * FROM {small_table_name} LIMIT {BATCH_SIZE} OFFSET {offset}')
            rows = cur.fetchall()
            if not rows:
                break
            create_insert_statement(table_name, rows)
            offset += batch_size


def create_insert_statement(table_name, rows):
    """
    :param table_name: insert する先のテーブル名
    :param rows: テーブルを select * した結果の配列
    :return:
    """
    global output_file

    statement = f'INSERT INTO {table_name} VALUES '
    for i, row in enumerate(rows):
        value_list = row.values()
        tmp = '('
        for value in value_list:
            tmp += convert_to_str(table_name, i, value)
            tmp += ','
        tmp = tmp[:-1] + '),'
        statement += tmp
    statement = statement[:-1] + ';'
    output_file.write(f'{statement}\n\n')


# どのテーブルのN個目のカラムをどうマスキングするか
# ちょっと複雑なことしたかったら Lambda 関数とか使うとよさそう
MASKING_TARGET = {
    'users': {2: '***'},
}
def convert_to_str(table_name, i, value):
    """
    インポートできる形にエスケープする
    マスキング処理もここでやる
    """
    if table_name in MASKING_TARGET:
        if i in MASKING_TARGET[table_name]:
            return MASKING_TARGET[table_name][i]
    elif isinstance(value, str):
        escaped = value.replace("\\", "\\\\").replace("'", "\\'")
        return f"'{escaped}'"
    elif isinstance(value, int):
        return str(value)
    elif isinstance(value, float):
        return str(value)
    elif isinstance(value, datetime.datetime):
        return f"'{str(value)}'"
    elif isinstance(value, datetime.date):
        return f"'{str(value)}'"
    elif value is None:
        return 'null'
    # 必要に応じてパターン追記してね
    else:
        raise Exception(f'Value Error. data: {value}, data class: {value._class_}')

# create_small_db_dump() を呼ぶ段階では元のテーブルと _small なテーブルが混在していますが、
# 欲しいのは元のテーブルの情報だけなので、
# _small なテーブルを作る前に get_table_list() が呼ばれる前提で cache をもっておくと良いです
# (それが暗黙的過ぎて怖い場合には get_table_list() の中に _small なテーブルを弾く処理を書いてください)
@lru_cache(maxsize=1)
def get_table_list() -> List[str]:
    # 上で書いた処理

あとは既存のDBをdropする処理とか、create table する処理が必要になるのでササッと書きましょう。
ここまでついて来れていればすぐに書けるはず。

def build_drop_and_create_database_statement(database_name):
    global output_file

    output_file.write(f'DROP DATABASE IF EXISTS {database_name};\n')
    output_file.write(f'CREATE DATABASE IF NOT EXISTS {database_name};\n')
    output_file.write(f'USE {database_name};\n\n')


def build_create_table_statement():
    global db

    table_list = get_table_list()
    cur = db.cursor()
    for table_name in table_list:
        cur.execute(f'show create table {table_name}')
        statement = cur.fetchone()['Create Table']
        output_file.write(f'{statement};\n\n')

3. ローカルでそのdumpをimportする

ココまでくればdumpファイルをローカルにもってきてimportするだけです。

$ mysql -u user -p database < hoge.dump

お疲れさまでした。

感想

最初は部分集合なテーブルを作る方針ではなくて、元のテーブルから直接INSERT文生成しながらデータの整合性を取っていく方針だったのですが、dump(長時間) -> insert(長時間) の途中で「Foreign Keyの整合性とれませーん!」とか「このデータエスケープされてませーん!」ってエラーがでて考慮漏れ発覚。みたいな感じでPDCAのサイクルが長すぎて効率悪すぎたので、先に整合性の取れたデータ量の小さい部分集合のテーブルを作ってそれを愚直にdumpする作戦に切り替えました。

本番データを扱うので気をつけないといけないことは多いですが、開発環境でより現実に近いデータで動作確認できることで開発効率が上がると良いなーと思っています。

9
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?