LoginSignup
22
18

More than 3 years have passed since last update.

SQLAlchemy で悲観的・楽観的排他制御

Last updated at Posted at 2019-06-18

SQLAlchemy とは

SQLAlchemyは Python の ORM で、リレーショナルデータベースをオブジェクトとして扱うとこができるライブラリです。
SQLite、MySQL、PostgreSQL、Oracle などのデータベースに使用することができます。
導入するメリットとしては、

  • SQL を直接記述することなく、Python オブジェクトとしてデータベースを操作できる
  • データベースの種類によらず、同一のソースコードで複数のデータベースを併用することができる
  • SQL インジェクション対策がサポートされている

などが上げられます。
今回は SQLAlchemy と MySQL を用いて、悲観的・楽観的排他制御を実装していきます。

環境

SQLAlchemy を使用するには、以下のインストールが必要です。括弧内は今回使用したバージョンです。

  • Python (3.7.3)
  • データベース本体 (MySQL 5.7.17)
  • ドライバ (PyMySQL 0.9.3)
  • SQLAlchemy (1.3.4)

PyMySQL と SQLAlchemy はpipでインストールします。

$ pip install PyMySQL
$ pip install sqlalchemy

対応しているデータベースとドライバーについては、以下のページから確認してください。

SQLAlchemy 1.3 Documentation Dialects

SQLAlchemy でデータベースにアクセス

今回以下のようなUSERテーブルを作成しました。

mysql> show columns from USER;
+---------+-------------+------+-----+---------+----------------+
| Field   | Type        | Null | Key | Default | Extra          |
+---------+-------------+------+-----+---------+----------------+
| id      | int(11)     | NO   | PRI | NULL    | auto_increment |
| name    | varchar(30) | YES  |     | NULL    |                |
| data    | int(11)     | YES  |     | NULL    |                |
| version | int(11)     | YES  |     | NULL    |                |
+---------+-------------+------+-----+---------+----------------+
mysql> select * from USER;
+----+-------+------+---------+
| id | name  | data | version |
+----+-------+------+---------+
|  1 | test1 |  100 |       1 |
|  2 | test2 |  200 |       1 |
|  3 | test3 |  300 |       1 |
+----+-------+------+---------+

SQLAchemy を使用してデータベースにアクセスするには、

  • モジュールの読み込み
  • モデルとテーブルの作成
  • データベースエンジンの作成
  • セッションの作成

の順に行います。

# coding: utf-8
from sqlalchemy import Column, INT, VARCHAR
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

# ベースモデルの作成
Base = declarative_base()


class User(Base):
    """
    USERテーブルクラス
    必ずベースモデルを継承
    """
    # テーブル名
    __tablename__ = 'USER'
    # カラム
    id = Column(INT, primary_key=True, autoincrement=True)
    name = Column(VARCHAR(30))
    data = Column(INT)
    version = Column(INT)

    def __repr__(self):
        return '{} = name:{}, data:{}, version:{} '.format(self.__tablename__, self.name, self.data, self.version)


if __name__ == "__main__":

    # URLの作成
    url = 'mysql+pymysql://USER:PASSWORD@localhost/DATABASE?charset=utf8'
    # エンジンクラスの作成
    engine = create_engine(url, pool_size=5, max_overflow=0, echo=True)

    # セッションの作成
    Session = scoped_session(sessionmaker(autocommit=False, bind=engine))
    session = Session()

    # クエリ実行
    # SELECT * FROM USER WHERE ID = 1;
    user = session.query(User).filter(User.id == 1).one()
    print(user)

    # セッションクローズ
    session.expunge_all()
    engine.dispose()

このコードを実行すると、以下のようにテーブルのデータが出力されます。

$ python alchemy_sample.py
USER = name:test1, data:100, version:1

悲観的排他制御

SQLAlchemy で悲観的排他的制御を行うには、クエリを取得する際にwith_for_update()を使用します。

引数 説明
なし SELECT 文に FOR UPADATE 区が追加されます
read=true LOCK IN SHARE MODEを追加し、共有ロックモードになります
nowait=true Oracle と Postgresql で有効。ロックされたいた場合、即時排他エラーとします。

実際にサンプルコードで確認します。

# coding: utf-8
import time
from multiprocessing import Process
from sqlalchemy import Column, INT, VARCHAR, TIMESTAMP
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

Base = declarative_base()


class User(Base):
    """
    USERテーブルクラス
    必ずベースモデルを継承
    """
    # テーブル名
    __tablename__ = 'USER'
    # カラム
    id = Column(INT, primary_key=True, autoincrement=True)
    name = Column(VARCHAR(30))
    data = Column(INT)
    version = Column(INT)

    def __repr__(self):
        return '{} = name:{}, data:{}, version:{} '.format(self.__tablename__, self.name, self.data, self.version)


def db_access(p_id, id, sleep, data):
    print('プロセスID:{} 開始'.format(p_id))
    # URLの作成
    url = 'mysql+pymysql://USER:PASSWORD@localhost/DATABASE?charset=utf8'
    # エンジンクラスの作成
    engine = create_engine(url, pool_size=5, max_overflow=0)

    # セッションの作成
    Session = scoped_session(sessionmaker(autocommit=False, bind=engine))
    session = Session()

    # 悲観的排他制御
    # SELECT * FROM USER WHERE ID = 1 FOR UPDATE;
    user = session.query(User).filter(
        User.id == id).with_for_update().one()
    print('プロセスID:{} , 取得:{}'.format(p_id, user))
    # UPDATE USER SET DATA = ${data}, VERSION = VERSION + 1 WHERE ID = 1;
    user.data = data
    user.version += 1

    # スリープ
    time.sleep(sleep)

    # コミット
    session.commit()
    print('プロセスID:{} , 更新:{}'.format(p_id, user))
    session.expunge_all()
    engine.dispose()


if __name__ == "__main__":

    process1 = Process(target=db_access, args=(1, 1, 10, 10))
    process2 = Process(target=db_access, args=(2, 1, 2, 2))

    process1.start()
    process2.start()

実行結果は以下のようになります。

プロセスID:1 開始
プロセスID:2 開始
プロセスID:1 , 取得:USER = name:test1, data:100, version:1
プロセスID:1 , 更新:USER = name:test1, data:10, version:2
プロセスID:2 , 取得:USER = name:test1, data:10, version:2
プロセスID:2 , 更新:USER = name:test1, data:2, version:3
mysql> select * from user where id = 1;
+----+-------+------+---------+
| id | name  | data | version |
+----+-------+------+---------+
|  1 | test1 |    2 |       3 |
+----+-------+------+---------+

先にロックをかけたプロセスの更新が終了してから、次のプロセスの更新の処理が実行されているのが確認できます。

楽観的排他制御

SQLAlchemy で楽観的排他制御を実行する場合は、モデルにバージョン情報を保存するフィールドを作成します。
そのバージョン情報を__mapper_args__属性に、version_id_colとして定義します。

class User(Base):
    """
    USERテーブルクラス
    必ずベースモデルを継承
    """
    # テーブル名
    __tablename__ = 'USER'
    # カラム
    id = Column(INT, primary_key=True, autoincrement=True)
    name = Column(VARCHAR(30))
    data = Column(INT)
    version = Column(BigInteger, nullable=False)
    # 楽観的排他制御
    __mapper_args__ = {'version_id_col': version}

version_id_colで定義することで、定義されたカラムをバージョン情報として追跡することができ、SQLAlchemy のセッションが commit されデータが更新されると、バージョン情報をカウントアップすることができます。
更新が競合した場合はsqlalchemy.orm.exc.StaleDataErrorが発生します。

実際にサンプルコードで確認します。

# coding: utf-8
import time
from multiprocessing import Process
from sqlalchemy import Column, INT, VARCHAR, TIMESTAMP
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

Base = declarative_base()


class User(Base):
    """
    USERテーブルクラス
    必ずベースモデルを継承
    """
    # テーブル名
    __tablename__ = 'USER'
    # カラム
    id = Column(INT, primary_key=True, autoincrement=True)
    name = Column(VARCHAR(30))
    data = Column(INT)
    version = Column(INT, nullable=False)
    # 楽観的排他制御
    __mapper_args__ = {'version_id_col': version}

    def __repr__(self):
        return '{} = name:{}, data:{}, version:{} '.format(self.__tablename__, self.name, self.data, self.version)


def db_access(p_id, id, sleep, data):
    print('プロセスID:{} 開始'.format(p_id))
    # URLの作成
    url = 'mysql+pymysql://USER:PASSWORD@localhost/DATABASE?charset=utf8'
    # エンジンクラスの作成
    engine = create_engine(url, pool_size=5, max_overflow=0)

    # セッションの作成
    Session = scoped_session(sessionmaker(autocommit=False, bind=engine))
    session = Session()

    # クエリ実行
    # SELECT * FROM USER WHERE ID = 1;
    user = session.query(User).filter(User.id == id).one()
    print('プロセスID:{} , 取得:{}'.format(p_id, user))
    # UPDATE USER SET DATA = ${data}, VERSION = VERSION + 1 WHERE ID = 1;
    user.data = data
    user.version += 1

    # スリープ
    time.sleep(sleep)

    # コミット
    session.commit()
    print('プロセスID:{} , 更新:{}'.format(p_id, user))
    session.expunge_all()
    engine.dispose()


if __name__ == "__main__":

    process1 = Process(target=db_access, args=(1, 1, 10, 10))
    process2 = Process(target=db_access, args=(2, 1, 2, 2))

    process1.start()
    process2.start()

実行結果は以下のようになります。

プロセスID:1 開始
プロセスID:2 開始
プロセスID:2 , 取得:USER = name:test1, data:100, version:1
プロセスID:1 , 取得:USER = name:test1, data:100, version:1
プロセスID:2 , 更新:USER = name:test1, data:2, version:2
>>>省略
sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'USER' expected to update 1 row(s); 0 were matched.
mysql> select * from user where id = 1;
+----+-------+------+---------+
| id | name  | data | version |
+----+-------+------+---------+
|  1 | test1 |    2 |       2 |
+----+-------+------+---------+

同じバージョン情報のデータを取得したセッションのうち、先に commit したセッションはデータを更新できます。
その後、同じバージョン情報を保持したセッションを commit しようとすると、すでにバージョン情報が更新されているため、'sqlalchemy.orm.exc.StaleDataError`が発生し、トランザクションをロールバックします。

まとめ

SQLAlchemy を用いた排他制御についてまとめました。
SQLAlchemy 自体、日本語で解説しているサイトがまだまだ少ないため、参考にしていただけたらと思います。

22
18
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
22
18