Leapcell: The Next - Gen Serverless Platform for Web Hosting, Async Tasks, and Redis
SQLAlchemyチュートリアル
SQLAlchemyはPythonエコシステムで最も人気のあるオブジェクトリレーショナルマッピング(ORM)です。エレガントな設計があり、下層のCoreと上層の伝統的なORMの2つの部分に分かれています。Pythonや他の言語の多くのORMでは、良い階層設計が実装されていません。例えば、DjangoのORMでは、データベース接続とORM自体が完全に混ざっています。
なぜCoreが必要なのか?
Core層は主にクライアント接続プールを実装しています。現代のWebアプリケーションの中核として、リレーショナルデータベースの同時接続能力は一般的に強くありません。大量の短い接続を使用することは推奨されず、ほとんどの場合、接続プールが必要です。接続プールには大まかに2種類あります。
- サーバー側接続プール:専用の接続プールミドルウェアで、短い接続のたびに再利用するための長い接続を割り当てます。
- クライアント側接続プール:一般的にサードパーティライブラリとしてコードに導入されます。
SQLAlchemyの接続プールはクライアント側接続プールに属します。この接続プールでは、SQLAlchemyは一定数の長い接続を維持しています。connect
が呼び出されると、実際にはプールから接続を取得し、close
が呼び出されると、実際には接続をプールに返します。
接続の作成
SQLAlchemyでは、create_engine
を使用して接続(プール)を作成します。create_engine
のパラメータはデータベースのURLです。
from sqlalchemy import create_engine
# MySQL接続の例
engine = create_engine(
"mysql://user:password@localhost:3306/dbname",
echo=True, # echoをTrueに設定すると、実際に実行されるSQLが出力され、デバッグが容易になります
future=True, # SQLAlchemy 2.0 APIを使用します。これは下位互換性があります
pool_size=5, # 接続プールのサイズはデフォルトで5です。0に設定すると、接続に制限はありません
pool_recycle=3600 # データベースの自動切断を制限する時間を設定します
)
# メモリ内のSQLiteデータベースを作成します。check_same_thread=Falseを追加する必要があります。そうしないと、マルチスレッド環境で使用できません
engine = create_engine("sqlite:///:memory:", echo=True, future=True,
connect_args={"check_same_thread": False})
# MySQLに接続する別の方法
# pip install mysqlclient
engine = create_engine('mysql+mysqldb://user:password@localhost/foo?charset=utf8mb4')
Core層 — 直接SQLを使用する
CRUD
from sqlalchemy import text
with engine.connect() as conn:
result = conn.execute(text("select * from users"))
print(result.all())
# 結果は反復可能で、各行の結果はRowオブジェクトです
for row in result:
# Rowオブジェクトは3つのアクセス方法をサポートします
print(row.x, row.y)
print(row[0], row[1])
print(row["x"], row["y"])
# パラメータを渡すには、`:var`を使用します
result = conn.execute(
text("SELECT x, y FROM some_table WHERE y > :y"),
{"y": 2}
)
# パラメータを事前にコンパイルすることもできます
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)
# 挿入するときは、複数行を直接挿入することができます
conn.execute(
text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
[{"x": 11, "y": 12}, {"x": 13, "y": 14}]
)
トランザクションとコミット
SQLAlchemyは2つのコミット方法を提供しています。1つは手動のcommit
、もう1つは半自動のcommit
です。公式ドキュメントではengine.begin()
の使用が推奨されています。各行ごとにコミットする完全自動のautocommit
方法もありますが、これは推奨されません。
# "commit as you go"は手動でコミットする必要があります
with engine.connect() as conn:
conn.execute(text("CREATE TABLE some_table (x int, y int)"))
conn.execute(
text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
[{"x": 1, "y": 1}, {"x": 2, "y": 4}]
)
conn.commit() # ここでコミットすることに注意してください
# "begin once"半自動コミット
with engine.begin() as conn:
conn.execute(
text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
[{"x": 6, "y": 8}, {"x": 9, "y": 10}]
)
ORM
Session
Session
はスレッドセーフではありません。しかし、一般的にWebフレームワークは各リクエストの開始時にsession
を取得するため、問題にはなりません。
from sqlalchemy.orm import Session
with Session(engine) as session:
session.add(foo)
session.commit()
# sessionmakerを使用してファクトリ関数を作成することもできます。これにより、毎回パラメータを入力する必要がなくなります
from sqlalchemy.orm import sessionmaker
new_session = sessionmaker(engine)
with new_session() as session:
...
Declarative API
-
__tablename__
を使用してデータベースのテーブル名を指定します。 -
Mapped
とネイティブ型を使用して各フィールドを宣言します。 -
Integer
、String
などを使用してフィールドの型を指定します。 -
index
パラメータを使用してインデックスを指定します。 -
unique
パラメータを使用して一意のインデックスを指定します。 -
__table_args__
を使用して他の属性を指定します。例えば、複合インデックスなどです。
from datetime import datetime
from sqlalchemy import Integer, String, func, UniqueConstraint
from sqlalchemy.orm import relationship, mapped_column, Mapped
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
# タプルである必要があり、リストではないことに注意してください
__table_args__ = (UniqueConstraint("name", "time_created"),)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
name: Mapped[str] = mapped_column(String(30), index=True)
fullname: Mapped[str] = mapped_column(String, unique=True)
# 特に大きなフィールドの場合、deferredを使用することもできます。これにより、このフィールドはデフォルトではロードされません
description: Mapped[str] = mapped_column(Text, deferred=True)
# デフォルト値。関数を渡すことに注意してください。現在の時刻ではありません
time_created: Mapped[datetime] = mapped_column(DateTime(Timezone=True), default=datetime.now)
# または、サーバーのデフォルト値を使用することもできます。ただし、これはテーブル作成時に設定する必要があり、テーブルのスキーマの一部になります
time_created: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
time_updated: Mapped[datetime] = mapped_column(DateTime(timezone=True), onupdate=func.now())
class Address(Base):
__tablename__ = "address"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
email_address: Mapped[str] = mapped_column(String, nullable=False)
# create_allを呼び出してすべてのモデルを作成します
Base.metadata.create_all(engine)
# 1つのモデルだけを作成する場合
User.__table__.create(engine)
外部キー
relationship
を使用してモデル間の関連関係を指定します。
1対多関係の双方向マッピング
from sqlalchemy import create_engine, Integer, String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, relationship, Session, Mapped, mapped_column
class Group(Base):
__tablename__ = 'groups'
id: Mapped[int] = mapped_column(Integer, primary_key=True)
name: Mapped[str] = mapped_column(String)
# 対応する複数のユーザー。ここではモデル名をパラメータとして使用します
members = relationship('User')
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
# group_idはデータベース内の実際の外部キー名で、2番目のフィールドForeignKeyは対応するIDを指定するために使用されます
group_id = Column(Integer, ForeignKey('groups.id'))
# モデル内の対応するグループフィールド。対応するモデル内のどのフィールドと重複するかを宣言する必要があります
group = relationship('Group', overlaps="members")
多対多マッピングには、関連テーブルが必要です
# 関連テーブル
class UserPermissions(Base):
__tablename__ = 'user_permissions'
id: Mapped[int] = mapped_column(Integer, primary_key=True)
# 外部キーを使用して外部キーを指定することもできます
user_id: Mapped[int] = mapped_column(Integer, ForeignKey('users.id'))
permission_id: Mapped[str] = mapped_column(String, ForeignKey('permissions.id'))
class User(Base):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(Integer, primary_key=True)
name: Mapped[str] = Column(String)
# secondaryを使用して関連テーブルを指定し、overlapsを使用してモデル内の対応するフィールドを指定します
permissions = relationship('Permission', secondary="user_permissions", overlaps="users")
class Permission(Base):
__tablename__ = 'permissions'
id: Mapped[int] = mapped_column(Integer, primary_key=True)
name: Mapped[str] = Column(String)
# 上記と同じ
users = relationship('User', secondary="user_permissions", overlaps="permissions")
user1 = User(name='user1', group_id=1)
user2 = User(name='user2')
group1 = Group(name='group1')
group2 = Group(name='group2', members=[user2])
permission1 = Permission(name="open_file")
permission2 = Permission(name="save_file")
user1.permissions.append(permission1)
db.add_all([user1, user2, group1, group2, permission1, permission2])
db.commit()
print(user1.permissions[0].id)
他の多くのチュートリアルでは、backref
を使用して対応するモデルの属性を生成しています。ここでは、対応するモデルでアクセス可能な属性を明示的に宣言することが好まれます。
CRUD
1.x APIとは異なり、2.0 APIではquery
は使用されず、select
を使用してデータをクエリします。
from sqlalchemy import select
# whereのパラメータは`==`で構成される式です。利点は、コードを書く際にスペルミスが検出されることです
stmt = select(User).where(User.name == "john").order_by(User.id)
# filter_byは**kwargsをパラメータとして使用します
stmt = select(User).filter_by(name="some_user")
# order_byはUser.id.desc()を使用して逆順ソートを表すこともできます
result = session.execute(stmt)
# 一般的に、オブジェクト全体を選択する場合、scalarsメソッドを使用する必要があります。そうしないと、1つのオブジェクトを含むタプルが返されます
for user in result.scalars():
print(user.name)
# モデルの単一の属性をクエリする場合、scalarsを使用する必要はありません
result = session.execute(select(User.name))
for row in result:
print(row.name)
# idでクエリするショートカットもあります
user = session.get(User, pk=1)
# データを更新するには、update文を使用する必要があります
from sqlalchemy import update
# synchronize_sessionには3つのオプションがあります:false、"fetch"、"evaluate"。デフォルトはevaluateです
# falseはPython内のオブジェクトをまったく更新しないことを意味します
# fetchはデータベースからオブジェクトを再ロードすることを意味します
# evaluateはデータベー
ベースを更新すると同時に、Python内のオブジェクトにもできるだけ同じ操作を試みることを意味します
stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch")
session.execute(stmt)
# または、属性に直接値を割り当てることもできます
user.name = "John"
session.commit()
# ここには競合状態(レースコンディション)が発生する可能性がある箇所があります
# 間違い! 2つのプロセスが同時にこの値を更新すると、1つの値しか更新されない可能性があります。
# 両方が自分たちが正しいと思う値(2)を割り当てますが、実際の正しい値は1 + 1 + 1 = 3です
# 対応するSQL:Update users set visit_count = 2 where user.id = 1
user.visit_count += 1
# 正しいアプローチ:大文字のUに注意してください。つまり、モデルの属性を使用し、生成されるSQLはSQLサーバー側で1を加算するものです
# 対応するSQL:Update users set visit_count = visit_count + 1 where user.id = 1
user.visit_count = User.visit_count + 1
# オブジェクトを追加するには、直接session.addメソッドを使用します
session.add(user)
# またはadd_all
session.add_all([user1, user2, group1])
# 挿入されたIDを取得したい場合は、もちろんコミット後に読み取ることもできます
session.flush() # flushはコミットではなく、トランザクションはまだコミットされていません。繰り返し読み取り可能である必要があり、これはデータベースの分離レベルに関係します
print(user.id)
# 削除するには、session.deleteを使用します
session.delete(user)
関連モデルの読み込み
N件のレコードのリストを読み込んだ後、それぞれの項目の具体的な値をデータベースから1つずつ読み込むと、N + 1回のクエリが発生します。これはデータベースで最も一般的な間違いであるN + 1問題です。
デフォルトでは、外部キー関連モデルはクエリで読み込まれません。selectinload
オプションを使用して外部キーを読み込むことで、N + 1問題を回避できます。
# 外部キーを読み込まない場合
session.execute(select(User)).scalars().all()
# 外部キーを読み込む場合
session.execute(select(User).options(selectinload(User.groups))).scalars().all()
Selectinload
の原理はselect in
サブクエリを使用することです。selectinload
の他に、従来のjoinedload
も使用できます。その原理は最も一般的な結合テーブル
です。
# joinedloadを使用して外部キーを読み込みます。2.0で指定されているuniqueメソッドを使用する必要があることに注意してください
session.execute(select(User).options(joinedload(User.groups))).unique().scalars().all()
2.0では、joinedload
ではなくselectinload
の使用が推奨されます。一般的にselectinload
の方が良く、unique
を使用する必要もありません。
外部キーの操作
SQLAlchemyでは、外部キーを配列のように直接操作することができます。
user.permissions.append(open_permission) # 追加
user.permissions.remove(save_permission) # 削除
# すべての外部キーをクリアする
user.permissions.clear()
user.permissions = []
JSONフィールドの特殊な扱い
現在、ほとんどのデータベースはJSONフィールドをサポートしています。SQLAlchemyでは、フィールドからJSONオブジェクトを直接読み取ったり、JSONオブジェクトを書き込んだりすることができます。ただし、このJSONオブジェクトに直接update
を実行してデータベースに書き戻すことを期待してはいけません。これは信頼できません。必ずコピー、読み書きを行ってから再割り当てしてください。
import copy
article = session.get(Article, 1)
tags = copy.copy(article.tags)
tags.append("iOS")
article.tags = tags
session.commit()
バッチ挿入
大量のデータを挿入する必要がある場合、1つずつ挿入する方法を使用すると、データベースとのやり取りに多くの時間が費やされ、効率が非常に低くなります。MySQLなどのほとんどのデータベースはinsert ... values (...), (...) ...
というバッチ挿入APIを提供しており、これをSQLAlchemyでもうまく利用することができます。
# session.bulk_save_objects(...)を使用して複数のオブジェクトを直接挿入します
from sqlalchemy.orm import Session
s = Session()
objects = [
User(name="u1"),
User(name="u2"),
User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()
# bulk_insert_mappingsを使用すると、オブジェクトを作成するオーバーヘッドを節約し、辞書を直接挿入することができます
users = [
{"name": "u1"},
{"name": "u2"},
{"name": "u3"},
]
s.bulk_insert_mappings(User, users)
s.commit()
# bulk_update_mappingsを使用すると、オブジェクトをバッチで更新することができます。辞書内のidがwhere条件として使用され、他のすべてのフィールドが更新に使用されます
session.bulk_update_mappings(User, users)
DeclarativeBase
Pythonのネイティブ型システムを完全に受け入れる
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
from sqlalchemy.orm import mapped_column, MappedColumn
id: Mapped[int] = mapped_column(Integer, primary_key=True)
fullname: Mapped[Optional[str]]
Asyncio
1タスクにつき1つのAsyncSession
。AsyncSession
オブジェクトは、進行中の単一のステートフルなデータベーストランザクションを表す、可変でステートフルなオブジェクトです。asyncio
を使用して並行タスクを実行する場合、例えばasyncio.gather()
のようなAPIを使用する場合、各個別のタスクは別々のAsyncSession
を使用する必要があります。
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
engine = create_async_engine(url, echo=True)
session = async_sessionmaker(engine)
# オブジェクトを作成する
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# データを挿入する
async with session() as db:
db.add(...)
await db.commit()
# データをクエリする
async with session() as db:
stmt = select(A)
row = await db.execute(stmt)
for obj in row.scalars():
print(obj.id)
await engine.dispose()
マルチプロセス環境での使用
Pythonのグローバルインタープリターロック(GIL)のため、マルチコアプロセッサを活用するにはマルチプロセッシングを使用する必要があります。マルチプロセッシング環境では、リソースを共有することができません。SQLAlchemyに関しては、接続プールを共有することができません。この問題を手動で解決する必要があります。
一般的に、複数のプロセス間で同じSession
を共有しようとするのは避けるべきです。各プロセスの初期化時にSession
を作成するのが最善です。
値が設定されている場合のみwhere条件を追加する
URLでは、ユーザーが指定したオプションに応じて対応する結果を返す必要があることがよくあります。
query = select(User)
if username is not None:
query = query.where(User.username == username)
if password is not None:
query = query.where(User.password == password)
Leapcell: The Next - Gen Serverless Platform for Web Hosting, Async Tasks, and Redis
最後に、Pythonサービスをデプロイするのに最適なプラットフォームをおすすめします:Leapcell
1. 多言語サポート
- JavaScript、Python、Go、またはRustで開発できます。
2. 無制限のプロジェクトを無料でデプロイ
- 使用量に応じて課金されます — リクエストがなければ料金はかかりません。
3. 比類のないコスト効率
- 従量制課金で、アイドル時の料金はかかりません。
- 例:平均応答時間60msで694万回のリクエストを$25でサポートします。
4. シンプルな開発者体験
- 直感的なUIで簡単にセットアップできます。
- 完全自動化されたCI/CDパイプラインとGitOps統合。
- アクション可能な洞察を得るためのリアルタイムメトリクスとロギング。
5. 簡単なスケーラビリティと高いパフォーマンス
- 高い同時接続を簡単に処理するための自動スケーリング。
- 運用オーバーヘッドがゼロ — 構築に集中できます。
Leapcell Twitter: https://x.com/LeapcellHQ