SQLAlchemy とは
SQLAlchemyは Python の ORM で、リレーショナルデータベースをオブジェクトとして扱うとこができるライブラリです。
SQLite、MySQL、PostgreSQL、Oracle などのデータベースに使用することができます。
導入するメリットとしては、
- SQL を直接記述することなく、Python オブジェクトとしてデータベースを操作できる
- データベースの種類によらず、同一のソースコードで複数のデータベースを併用することができる
- SQL インジェクション対策がサポートされている
などが上げられます。
今回は SQLAlchemy と MySQL を用いて、悲観的・楽観的排他制御を実装していきます。
環境
SQLAlchemy を使用するには、以下のインストールが必要です。括弧内は今回使用したバージョンです。
- Python (3.7.3)
- データベース本体 (MySQL 5.7.17)
- ドライバ (PyMySQL 0.9.3)
- SQLAlchemy (1.3.4)
PyMySQL と SQLAlchemy はpip
でインストールします。
$ pip install PyMySQL
$ pip install sqlalchemy
対応しているデータベースとドライバーについては、以下のページから確認してください。
SQLAlchemy 1.3 Documentation Dialects
SQLAlchemy でデータベースにアクセス
今回以下のようなUSER
テーブルを作成しました。
mysql> show columns from USER;
+---------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------+-------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| name | varchar(30) | YES | | NULL | |
| data | int(11) | YES | | NULL | |
| version | int(11) | YES | | NULL | |
+---------+-------------+------+-----+---------+----------------+
mysql> select * from USER;
+----+-------+------+---------+
| id | name | data | version |
+----+-------+------+---------+
| 1 | test1 | 100 | 1 |
| 2 | test2 | 200 | 1 |
| 3 | test3 | 300 | 1 |
+----+-------+------+---------+
SQLAchemy を使用してデータベースにアクセスするには、
- モジュールの読み込み
- モデルとテーブルの作成
- データベースエンジンの作成
- セッションの作成
の順に行います。
# coding: utf-8
from sqlalchemy import Column, INT, VARCHAR
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
# ベースモデルの作成
Base = declarative_base()
class User(Base):
"""
USERテーブルクラス
必ずベースモデルを継承
"""
# テーブル名
__tablename__ = 'USER'
# カラム
id = Column(INT, primary_key=True, autoincrement=True)
name = Column(VARCHAR(30))
data = Column(INT)
version = Column(INT)
def __repr__(self):
return '{} = name:{}, data:{}, version:{} '.format(self.__tablename__, self.name, self.data, self.version)
if __name__ == "__main__":
# URLの作成
url = 'mysql+pymysql://USER:PASSWORD@localhost/DATABASE?charset=utf8'
# エンジンクラスの作成
engine = create_engine(url, pool_size=5, max_overflow=0, echo=True)
# セッションの作成
Session = scoped_session(sessionmaker(autocommit=False, bind=engine))
session = Session()
# クエリ実行
# SELECT * FROM USER WHERE ID = 1;
user = session.query(User).filter(User.id == 1).one()
print(user)
# セッションクローズ
session.expunge_all()
engine.dispose()
このコードを実行すると、以下のようにテーブルのデータが出力されます。
$ python alchemy_sample.py
USER = name:test1, data:100, version:1
悲観的排他制御
SQLAlchemy で悲観的排他的制御を行うには、クエリを取得する際にwith_for_update()
を使用します。
引数 | 説明 |
---|---|
なし | SELECT 文に FOR UPADATE 区が追加されます |
read=true |
LOCK IN SHARE MODE を追加し、共有ロックモードになります |
nowait=true | Oracle と Postgresql で有効。ロックされたいた場合、即時排他エラーとします。 |
実際にサンプルコードで確認します。
# coding: utf-8
import time
from multiprocessing import Process
from sqlalchemy import Column, INT, VARCHAR, TIMESTAMP
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
Base = declarative_base()
class User(Base):
"""
USERテーブルクラス
必ずベースモデルを継承
"""
# テーブル名
__tablename__ = 'USER'
# カラム
id = Column(INT, primary_key=True, autoincrement=True)
name = Column(VARCHAR(30))
data = Column(INT)
version = Column(INT)
def __repr__(self):
return '{} = name:{}, data:{}, version:{} '.format(self.__tablename__, self.name, self.data, self.version)
def db_access(p_id, id, sleep, data):
print('プロセスID:{} 開始'.format(p_id))
# URLの作成
url = 'mysql+pymysql://USER:PASSWORD@localhost/DATABASE?charset=utf8'
# エンジンクラスの作成
engine = create_engine(url, pool_size=5, max_overflow=0)
# セッションの作成
Session = scoped_session(sessionmaker(autocommit=False, bind=engine))
session = Session()
# 悲観的排他制御
# SELECT * FROM USER WHERE ID = 1 FOR UPDATE;
user = session.query(User).filter(
User.id == id).with_for_update().one()
print('プロセスID:{} , 取得:{}'.format(p_id, user))
# UPDATE USER SET DATA = ${data}, VERSION = VERSION + 1 WHERE ID = 1;
user.data = data
user.version += 1
# スリープ
time.sleep(sleep)
# コミット
session.commit()
print('プロセスID:{} , 更新:{}'.format(p_id, user))
session.expunge_all()
engine.dispose()
if __name__ == "__main__":
process1 = Process(target=db_access, args=(1, 1, 10, 10))
process2 = Process(target=db_access, args=(2, 1, 2, 2))
process1.start()
process2.start()
実行結果は以下のようになります。
プロセスID:1 開始
プロセスID:2 開始
プロセスID:1 , 取得:USER = name:test1, data:100, version:1
プロセスID:1 , 更新:USER = name:test1, data:10, version:2
プロセスID:2 , 取得:USER = name:test1, data:10, version:2
プロセスID:2 , 更新:USER = name:test1, data:2, version:3
mysql> select * from user where id = 1;
+----+-------+------+---------+
| id | name | data | version |
+----+-------+------+---------+
| 1 | test1 | 2 | 3 |
+----+-------+------+---------+
先にロックをかけたプロセスの更新が終了してから、次のプロセスの更新の処理が実行されているのが確認できます。
楽観的排他制御
SQLAlchemy で楽観的排他制御を実行する場合は、モデルにバージョン情報を保存するフィールドを作成します。
そのバージョン情報を__mapper_args__
属性に、version_id_col
として定義します。
class User(Base):
"""
USERテーブルクラス
必ずベースモデルを継承
"""
# テーブル名
__tablename__ = 'USER'
# カラム
id = Column(INT, primary_key=True, autoincrement=True)
name = Column(VARCHAR(30))
data = Column(INT)
version = Column(BigInteger, nullable=False)
# 楽観的排他制御
__mapper_args__ = {'version_id_col': version}
version_id_col
で定義することで、定義されたカラムをバージョン情報として追跡することができ、SQLAlchemy のセッションが commit されデータが更新されると、バージョン情報をカウントアップすることができます。
更新が競合した場合はsqlalchemy.orm.exc.StaleDataError
が発生します。
実際にサンプルコードで確認します。
# coding: utf-8
import time
from multiprocessing import Process
from sqlalchemy import Column, INT, VARCHAR, TIMESTAMP
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
Base = declarative_base()
class User(Base):
"""
USERテーブルクラス
必ずベースモデルを継承
"""
# テーブル名
__tablename__ = 'USER'
# カラム
id = Column(INT, primary_key=True, autoincrement=True)
name = Column(VARCHAR(30))
data = Column(INT)
version = Column(INT, nullable=False)
# 楽観的排他制御
__mapper_args__ = {'version_id_col': version}
def __repr__(self):
return '{} = name:{}, data:{}, version:{} '.format(self.__tablename__, self.name, self.data, self.version)
def db_access(p_id, id, sleep, data):
print('プロセスID:{} 開始'.format(p_id))
# URLの作成
url = 'mysql+pymysql://USER:PASSWORD@localhost/DATABASE?charset=utf8'
# エンジンクラスの作成
engine = create_engine(url, pool_size=5, max_overflow=0)
# セッションの作成
Session = scoped_session(sessionmaker(autocommit=False, bind=engine))
session = Session()
# クエリ実行
# SELECT * FROM USER WHERE ID = 1;
user = session.query(User).filter(User.id == id).one()
print('プロセスID:{} , 取得:{}'.format(p_id, user))
# UPDATE USER SET DATA = ${data}, VERSION = VERSION + 1 WHERE ID = 1;
user.data = data
user.version += 1
# スリープ
time.sleep(sleep)
# コミット
session.commit()
print('プロセスID:{} , 更新:{}'.format(p_id, user))
session.expunge_all()
engine.dispose()
if __name__ == "__main__":
process1 = Process(target=db_access, args=(1, 1, 10, 10))
process2 = Process(target=db_access, args=(2, 1, 2, 2))
process1.start()
process2.start()
実行結果は以下のようになります。
プロセスID:1 開始
プロセスID:2 開始
プロセスID:2 , 取得:USER = name:test1, data:100, version:1
プロセスID:1 , 取得:USER = name:test1, data:100, version:1
プロセスID:2 , 更新:USER = name:test1, data:2, version:2
>>>省略
sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'USER' expected to update 1 row(s); 0 were matched.
mysql> select * from user where id = 1;
+----+-------+------+---------+
| id | name | data | version |
+----+-------+------+---------+
| 1 | test1 | 2 | 2 |
+----+-------+------+---------+
同じバージョン情報のデータを取得したセッションのうち、先に commit したセッションはデータを更新できます。
その後、同じバージョン情報を保持したセッションを commit しようとすると、すでにバージョン情報が更新されているため、'sqlalchemy.orm.exc.StaleDataError`が発生し、トランザクションをロールバックします。
まとめ
SQLAlchemy を用いた排他制御についてまとめました。
SQLAlchemy 自体、日本語で解説しているサイトがまだまだ少ないため、参考にしていただけたらと思います。