Python
migration
sqlalchemy
ORM

SQLAlchemyでマイグレーションした時にモデルとスキーマを同期する

pythonでテスト用DBを立てようと思ったが、1からの環境構築が面倒だったので
Docker+マイグレーションツールでサクっと構築したいと思った。

ついでにDB周りのコードもほぼ手書きだったので、O/Rマッパーも導入
SQLAlchemyとSQLAlchemy-Migrateで実現できそうだったのでやってみた。

実行環境

  • Windows10
  • Python 3.6.3
  • MySQL 14.14 (Docker Image)
  • SQLAlchemy 1.2.2
  • SQLAlchemy-Migrate 0.11.0
  • PyMySQL 0.8.0
  • mysqlclient 1.3.12

インストール

pip install sqlalchemy
pip install sqlalchemy-migrate
pip install pymysql
pip install mysqlclient

テスト用DB(MySQL)

DockerでサクっとローカルDBを立てます。

docker run --name mysql -e MYSQL_ROOT_PASSWORD=mysql -p 3306:3306 -d mysql

データベースは事前に用意する必要があるのでarchiveデータベースを作成します。

docker exec -it mysql bash

root@# mysql -uroot -pmysql
mysql> create database archive;
mysql> exit
root@# exit

SQLAlchemy-Migrateでマイグレーション

DBマイグレーションにはSQLAlchemy-Migrateを使用します。

マイグレーションプロジェクト用のファイル類をコマンドmigrate create <作成されるフォルダ名> "<リポジトリID>"で生成します。
dbは作業ディレクトリです。名前はなんでもいいです。

mkdir db
cd db
migrate create migrate "archive"

以下の構成で生成されます。

tree
project
|
└─db
  └─migrate
      │  manage.py
      │  migrate.cfg
      │  README
      │  __init__.py
      │
      └─versions
              __init__.py

manage.pyの編集

manage.pyを編集し、データベースurlとリポジトリパスを設定します。
データベースurlはdialect+driver://username:password@host:port/databaseの書式で記述し、データベース毎の具体的な記述方法はDatabase Urlsに一覧があります。
今回はMySQLを使用するので以下のようにmanage.pyを編集、リポジトリパスはmigrateディレクトリ配下ですが、manage.pyから見たらカレントなので'.'を指定します。

manage.py
if __name__ == '__main__':
    main(debug='False', url='mysql+pymysql://root:mysql@localhost/archive?charset=utf8', repository='.')

バージョン管理テーブルの生成

データベースにバージョン管理用のテーブルが必要になる為、以下コマンドで生成します。

cd migrate
python manage.py version_control
# --> migrate_versionテーブルが作られる

本来はデータベースurlとリポジトリパスが引数に必要ですが、manage.pyに定義済みの為上記のように省略してOKです。

# manage.pyで未定義、もしくは別環境に作成する場合
python manage.py version_control mysql+pymysql://root:mysql@localhost/archive?charset=utf8 .

バージョン確認方法

作成されたmigrate_versionテーブルを参照してもいいし、migrate version .コマンドでも確認できます。

booksテーブル作成

マイグレーションはスキーマ変更のスクリプトを元に行いますが、以下のコマンドでテンプレートから生成することができます。

python manage.py script "add books table"
# --> versions/001_add_books_table.pyが作成される

生成された001_add_books_table.pyにテーブルスキーマを定義し、テーブル追加のマイグレーションファイルとします。

001_add_books_table.py
from sqlalchemy import *
from migrate import *
from sqlalchemy.dialects.mysql import BIGINT, VARCHAR, DATETIME, TEXT

meta = MetaData()
table = Table(
    'books', meta,
    Column('id', BIGINT(unsigned=True), primary_key=True),
    Column('title', TEXT, nullable=False),
    Column('author', VARCHAR(64), nullable=False),
    Column('created_at', DATETIME, nullable=False),
    Column('updated_at', DATETIME, nullable=False))


def upgrade(migrate_engine):
    meta.bind = migrate_engine
    table.create()


def downgrade(migrate_engine):
    meta.bind = migrate_engine
    table.drop()

マイグレーション実行

upgradeコマンドで実行です。

python manage.py upgrade
# --> booksテーブルが作成される

downgradeコマンドで元に戻せますし、どこまで戻るかも指定できます。

python manage.py downgrade 0
# --> booksテーブルが破棄される(バージョン0まで戻る)

SQLAlchemyでDBアクセス

SQLAlchemyを使ってDBに触れてみます。

チュートリアルを参考に、dbディレクトリ直下にファイルを作成していきます。

__init__.py
#空ファイルです
session.py
# !/usr/bin/env python3
# coding: utf-8
""" session.py """

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine(
    'mysql+pymysql://root:mysql@localhost/archive?charset=utf8', echo=True)
Session = sessionmaker(bind=engine)

Bookクラスとしてテーブルを定義し、動作確認用にfindメソッドを定義しています。
テーブルスキーマの部分はマイグレーションのスクリプト001_add_books_table.pyからコピって整形しています。
ここちょっとイケてない実装な感じがしましたが、最初だけだと我慢して次に進みます。

book.py
# !/usr/bin/env python3
# coding: utf-8
""" book.py """

from sqlalchemy import Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.mysql import BIGINT, VARCHAR, DATETIME, TEXT
from db.session import Session

Base = declarative_base()


class Book(Base):
    __tablename__ = 'books'

    id = Column('id', BIGINT(unsigned=True), primary_key=True)
    title = Column('title', TEXT, nullable=False)
    author = Column('author', VARCHAR(64), nullable=False)
    created_at = Column('created_at', DATETIME, nullable=False)
    updated_at = Column('updated_at', DATETIME, nullable=False)

    @classmethod
    def find(cls, id):
        """ find record by id """
        session = Session()
        record = session.query(Book).filter(Book.id == id).first()
        session.close()
        return record

testsディレクトリを作成して、テストも書いとく

tests/test_book.py
# !/usr/bin/env python3
# coding: utf-8
""" test_book.py """

import unittest
from db.book import Book


class TestBook(unittest.TestCase):
    """ test Book class """

    def test_book_find(self):
        """ Book.find return not None """
        ret = Book.find(1)
        self.assertIsNotNone(ret)


if __name__ == "__main__":
    unittest.main()

この時点でのディレクトリ構成

project
│
├─db
│  │  book.py
│  │  session.py
│  │  __init__.py
│  │
│  └─migrate
│      │  manage.py
│      │  migrate.cfg
│      │  README
│      │  __init__.py
│      │
│      └─versions
│              001_add_books_table.py
│              __init__.py
│
└─tests
        test_book.py

テーブルスキーマの変更

せっかく(?)マイグレーションできるようにしたので、テーブルスキーマを変えてみたいと思います。

isbnカラムの追加

マイグレーションファイルの作成

python manage.py script "add isbn column"
# --> 002_add_isbn_column.pyが作成される
002_add_isbn_column.py
from sqlalchemy import *
from migrate import *
from sqlalchemy.dialects.mysql import BIGINT


def upgrade(migrate_engine):
    meta = MetaData(bind=migrate_engine)
    table = Table('books', meta, autoload=True)
    isbn = Column('isbn', BIGINT(13),  nullable=False)
    isbn.create(table)


def downgrade(migrate_engine):
    meta = MetaData(bind=migrate_engine)
    table = Table('books', meta, autoload=True)
    table.c.isbn.drop()

マイグレーション実行

python manage.py upgrade

続けてBookクラスも修正します。

book.py
class Book(Base):
    __tablename__ = 'books'

    id = Column('id', BIGINT(unsigned=True), primary_key=True)
    title = Column('title', TEXT, nullable=False)
    author = Column('author', VARCHAR(64), nullable=False)
    isbn = Column('isbn', BIGINT(13),  nullable=False)          # <-- 追加
    created_at = Column('created_at', DATETIME, nullable=False)
    updated_at = Column('updated_at', DATETIME, nullable=False)

ここで突然ブチキレですよ

マイグレーションする度にテーブルクラスを手動で変更するのはイケてないです。
マイグレーションファイルにスキーマ定義を書いているので、テーブルクラスが持つスキーマ定義もupgrade downgradeに追従して欲しいのが期待するところです。

...

調べたらやっぱりありました。Automapを使えばできそうです。

Automapを使用した実装

declarative_baseの代わりにautomap_baseを使います。

book.py
# !/usr/bin/env python3
# coding: utf-8
""" book.py """

from sqlalchemy import Column
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.dialects.mysql import BIGINT, VARCHAR, DATETIME, TEXT
from db.session import Session, engine

Base = automap_base()


class Book(Base):
    __tablename__ = 'books'

    @classmethod
    def find(cls, id):
        """ find record by id """
        session = Session()
        record = session.query(Book).filter(Book.id == id).first()
        session.close()
        return record


Base.prepare(engine, reflect=True)

Bookクラスからスキーマ情報が一切無くなりました!イケてる!
ポイントはBase.prepare(engine, reflect=True)です。ここでengineからスキーマ情報を読み出し、Bookクラスに注入します。
注意点として、Base.prepare(engine, reflect=True)Bookクラス定義の後でなければなりません。
先に呼び出すとInvalidRequestError: SQL expression, column, or mapped entity expectedを吐きます。

test_book.py
    def test_access_isbn(self):
        """ Book#isbn can access """
        ret = Book.find(1)
        self.assertEqual(9783161484100, ret.isbn)

追加のテストコードもきちんと通りました。

課題:セッション管理について

例としてfindメソッドを定義していましたが、メソッド内でセッションのopencloseをしているのがやっぱりイケてない気がします。
findメソッドをトランザクション処理の一部として利用できませんし、Session Frequently Asked Questionsでアンチパターンとして紹介もされています。

book.py
    @classmethod
    def find(cls, id):
        """ find record by id """
        session = Session()
        record = session.query(Book).filter(Book.id == id).first()
        session.close()
        return record

どうしたらいいかな。。。

追記 Instance is not bound to a Session

セッションをcommit、closeした後にセッション内のインスタンスにアクセスすると以下エラーが発生した。

sqlalchemy.orm.exc.DetachedInstanceError: Instance <Book at 0x697a4f0> is not bound to a Session; attribute refresh operation cannot proceed (Background on this error at: http://sqlalche.me/e/bhk3)

どうもデフォルトだとcommitするとセッション内のインスタンスが全て期限切れになるようだ。

Another behavior of commit() is that by default it expires the state of all instances present after the commit is complete. This is so that when the instances are next accessed, either through attribute access or by them being present in a Query result set, they receive the most recent state. To disable this behavior, configure sessionmaker with expire_on_commit=False.

最後に書いてある通り、sessionmakerexpire_on_commit=Falseオプションを付けると回避可能

Session = sessionmaker(bind=engine, expire_on_commit=False)