Help us understand the problem. What is going on with this article?

SQLAlchemy で悲観的・楽観的排他制御

SQLAlchemy とは

SQLAlchemyは Python の ORM で、リレーショナルデータベースをオブジェクトとして扱うとこができるライブラリです。
SQLite、MySQL、PostgreSQL、Oracle などのデータベースに使用することができます。
導入するメリットとしては、

  • SQL を直接記述することなく、Python オブジェクトとしてデータベースを操作できる
  • データベースの種類によらず、同一のソースコードで複数のデータベースを併用することができる
  • SQL インジェクション対策がサポートされている

などが上げられます。
今回は SQLAlchemy と MySQL を用いて、悲観的・楽観的排他制御を実装していきます。

環境

SQLAlchemy を使用するには、以下のインストールが必要です。括弧内は今回使用したバージョンです。

  • Python (3.7.3)
  • データベース本体 (MySQL 5.7.17)
  • ドライバ (PyMySQL 0.9.3)
  • SQLAlchemy (1.3.4)

PyMySQL と SQLAlchemy はpipでインストールします。

$ pip install PyMySQL
$ pip install sqlalchemy

対応しているデータベースとドライバーについては、以下のページから確認してください。

SQLAlchemy 1.3 Documentation Dialects

SQLAlchemy でデータベースにアクセス

今回以下のようなUSERテーブルを作成しました。

mysql> show columns from USER;
+---------+-------------+------+-----+---------+----------------+
| Field   | Type        | Null | Key | Default | Extra          |
+---------+-------------+------+-----+---------+----------------+
| id      | int(11)     | NO   | PRI | NULL    | auto_increment |
| name    | varchar(30) | YES  |     | NULL    |                |
| data    | int(11)     | YES  |     | NULL    |                |
| version | int(11)     | YES  |     | NULL    |                |
+---------+-------------+------+-----+---------+----------------+
mysql> select * from USER;
+----+-------+------+---------+
| id | name  | data | version |
+----+-------+------+---------+
|  1 | test1 |  100 |       1 |
|  2 | test2 |  200 |       1 |
|  3 | test3 |  300 |       1 |
+----+-------+------+---------+

SQLAchemy を使用してデータベースにアクセスするには、

  • モジュールの読み込み
  • モデルとテーブルの作成
  • データベースエンジンの作成
  • セッションの作成

の順に行います。

# coding: utf-8
from sqlalchemy import Column, INT, VARCHAR
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

# ベースモデルの作成
Base = declarative_base()


class User(Base):
    """
    USERテーブルクラス
    必ずベースモデルを継承
    """
    # テーブル名
    __tablename__ = 'USER'
    # カラム
    id = Column(INT, primary_key=True, autoincrement=True)
    name = Column(VARCHAR(30))
    data = Column(INT)
    version = Column(INT)

    def __repr__(self):
        return '{} = name:{}, data:{}, version:{} '.format(self.__tablename__, self.name, self.data, self.version)


if __name__ == "__main__":

    # URLの作成
    url = 'mysql+pymysql://USER:PASSWORD@localhost/DATABASE?charset=utf8'
    # エンジンクラスの作成
    engine = create_engine(url, pool_size=5, max_overflow=0, echo=True)

    # セッションの作成
    Session = scoped_session(sessionmaker(autocommit=False, bind=engine))
    session = Session()

    # クエリ実行
    # SELECT * FROM USER WHERE ID = 1;
    user = session.query(User).filter(User.id == 1).one()
    print(user)

    # セッションクローズ
    session.expunge_all()
    engine.dispose()

このコードを実行すると、以下のようにテーブルのデータが出力されます。

$ python alchemy_sample.py
USER = name:test1, data:100, version:1

悲観的排他制御

SQLAlchemy で悲観的排他的制御を行うには、クエリを取得する際にwith_for_update()を使用します。

引数 説明
なし SELECT 文に FOR UPADATE 区が追加されます
read=true LOCK IN SHARE MODEを追加し、共有ロックモードになります
nowait=true Oracle と Postgresql で有効。ロックされたいた場合、即時排他エラーとします。

実際にサンプルコードで確認します。

# coding: utf-8
import time
from multiprocessing import Process
from sqlalchemy import Column, INT, VARCHAR, TIMESTAMP
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

Base = declarative_base()


class User(Base):
    """
    USERテーブルクラス
    必ずベースモデルを継承
    """
    # テーブル名
    __tablename__ = 'USER'
    # カラム
    id = Column(INT, primary_key=True, autoincrement=True)
    name = Column(VARCHAR(30))
    data = Column(INT)
    version = Column(INT)

    def __repr__(self):
        return '{} = name:{}, data:{}, version:{} '.format(self.__tablename__, self.name, self.data, self.version)


def db_access(p_id, id, sleep, data):
    print('プロセスID:{} 開始'.format(p_id))
    # URLの作成
    url = 'mysql+pymysql://USER:PASSWORD@localhost/DATABASE?charset=utf8'
    # エンジンクラスの作成
    engine = create_engine(url, pool_size=5, max_overflow=0)

    # セッションの作成
    Session = scoped_session(sessionmaker(autocommit=False, bind=engine))
    session = Session()

    # 悲観的排他制御
    # SELECT * FROM USER WHERE ID = 1 FOR UPDATE;
    user = session.query(User).filter(
        User.id == id).with_for_update().one()
    print('プロセスID:{} , 取得:{}'.format(p_id, user))
    # UPDATE USER SET DATA = ${data}, VERSION = VERSION + 1 WHERE ID = 1;
    user.data = data
    user.version += 1

    # スリープ
    time.sleep(sleep)

    # コミット
    session.commit()
    print('プロセスID:{} , 更新:{}'.format(p_id, user))
    session.expunge_all()
    engine.dispose()


if __name__ == "__main__":

    process1 = Process(target=db_access, args=(1, 1, 10, 10))
    process2 = Process(target=db_access, args=(2, 1, 2, 2))

    process1.start()
    process2.start()

実行結果は以下のようになります。

プロセスID:1 開始
プロセスID:2 開始
プロセスID:1 , 取得:USER = name:test1, data:100, version:1
プロセスID:1 , 更新:USER = name:test1, data:10, version:2
プロセスID:2 , 取得:USER = name:test1, data:10, version:2
プロセスID:2 , 更新:USER = name:test1, data:2, version:3
mysql> select * from user where id = 1;
+----+-------+------+---------+
| id | name  | data | version |
+----+-------+------+---------+
|  1 | test1 |    2 |       3 |
+----+-------+------+---------+

先にロックをかけたプロセスの更新が終了してから、次のプロセスの更新の処理が実行されているのが確認できます。

楽観的排他制御

SQLAlchemy で楽観的排他制御を実行する場合は、モデルにバージョン情報を保存するフィールドを作成します。
そのバージョン情報を__mapper_args__属性に、version_id_colとして定義します。

class User(Base):
    """
    USERテーブルクラス
    必ずベースモデルを継承
    """
    # テーブル名
    __tablename__ = 'USER'
    # カラム
    id = Column(INT, primary_key=True, autoincrement=True)
    name = Column(VARCHAR(30))
    data = Column(INT)
    version = Column(BigInteger, nullable=False)
    # 楽観的排他制御
    __mapper_args__ = {'version_id_col': version}

version_id_colで定義することで、定義されたカラムをバージョン情報として追跡することができ、SQLAlchemy のセッションが commit されデータが更新されると、バージョン情報をカウントアップすることができます。
更新が競合した場合はsqlalchemy.orm.exc.StaleDataErrorが発生します。

実際にサンプルコードで確認します。

# coding: utf-8
import time
from multiprocessing import Process
from sqlalchemy import Column, INT, VARCHAR, TIMESTAMP
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

Base = declarative_base()


class User(Base):
    """
    USERテーブルクラス
    必ずベースモデルを継承
    """
    # テーブル名
    __tablename__ = 'USER'
    # カラム
    id = Column(INT, primary_key=True, autoincrement=True)
    name = Column(VARCHAR(30))
    data = Column(INT)
    version = Column(INT, nullable=False)
    # 楽観的排他制御
    __mapper_args__ = {'version_id_col': version}

    def __repr__(self):
        return '{} = name:{}, data:{}, version:{} '.format(self.__tablename__, self.name, self.data, self.version)


def db_access(p_id, id, sleep, data):
    print('プロセスID:{} 開始'.format(p_id))
    # URLの作成
    url = 'mysql+pymysql://USER:PASSWORD@localhost/DATABASE?charset=utf8'
    # エンジンクラスの作成
    engine = create_engine(url, pool_size=5, max_overflow=0)

    # セッションの作成
    Session = scoped_session(sessionmaker(autocommit=False, bind=engine))
    session = Session()

    # クエリ実行
    # SELECT * FROM USER WHERE ID = 1;
    user = session.query(User).filter(User.id == id).one()
    print('プロセスID:{} , 取得:{}'.format(p_id, user))
    # UPDATE USER SET DATA = ${data}, VERSION = VERSION + 1 WHERE ID = 1;
    user.data = data
    user.version += 1

    # スリープ
    time.sleep(sleep)

    # コミット
    session.commit()
    print('プロセスID:{} , 更新:{}'.format(p_id, user))
    session.expunge_all()
    engine.dispose()


if __name__ == "__main__":

    process1 = Process(target=db_access, args=(1, 1, 10, 10))
    process2 = Process(target=db_access, args=(2, 1, 2, 2))

    process1.start()
    process2.start()

実行結果は以下のようになります。

プロセスID:1 開始
プロセスID:2 開始
プロセスID:2 , 取得:USER = name:test1, data:100, version:1
プロセスID:1 , 取得:USER = name:test1, data:100, version:1
プロセスID:2 , 更新:USER = name:test1, data:2, version:2
>>>省略
sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'USER' expected to update 1 row(s); 0 were matched.
mysql> select * from user where id = 1;
+----+-------+------+---------+
| id | name  | data | version |
+----+-------+------+---------+
|  1 | test1 |    2 |       2 |
+----+-------+------+---------+

同じバージョン情報のデータを取得したセッションのうち、先に commit したセッションはデータを更新できます。
その後、同じバージョン情報を保持したセッションを commit しようとすると、すでにバージョン情報が更新されているため、'sqlalchemy.orm.exc.StaleDataError`が発生し、トランザクションをロールバックします。

まとめ

SQLAlchemy を用いた排他制御についてまとめました。
SQLAlchemy 自体、日本語で解説しているサイトがまだまだ少ないため、参考にしていただけたらと思います。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした