3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

SQLAlchemy 2.0:史上最高のPython ORM

Posted at

Group68.png

Leapcell: The Next - Gen Serverless Platform for Web Hosting, Async Tasks, and Redis

SQLAlchemyチュートリアル

SQLAlchemyはPythonエコシステムで最も人気のあるオブジェクトリレーショナルマッピング(ORM)です。エレガントな設計があり、下層のCoreと上層の伝統的なORMの2つの部分に分かれています。Pythonや他の言語の多くのORMでは、良い階層設計が実装されていません。例えば、DjangoのORMでは、データベース接続とORM自体が完全に混ざっています。

SQLAlchemy.svg.png

なぜCoreが必要なのか?

Core層は主にクライアント接続プールを実装しています。現代のWebアプリケーションの中核として、リレーショナルデータベースの同時接続能力は一般的に強くありません。大量の短い接続を使用することは推奨されず、ほとんどの場合、接続プールが必要です。接続プールには大まかに2種類あります。

  • サーバー側接続プール:専用の接続プールミドルウェアで、短い接続のたびに再利用するための長い接続を割り当てます。
  • クライアント側接続プール:一般的にサードパーティライブラリとしてコードに導入されます。

SQLAlchemyの接続プールはクライアント側接続プールに属します。この接続プールでは、SQLAlchemyは一定数の長い接続を維持しています。connectが呼び出されると、実際にはプールから接続を取得し、closeが呼び出されると、実際には接続をプールに返します。

接続の作成

SQLAlchemyでは、create_engineを使用して接続(プール)を作成します。create_engineのパラメータはデータベースのURLです。

from sqlalchemy import create_engine

# MySQL接続の例
engine = create_engine(
    "mysql://user:password@localhost:3306/dbname",
    echo=True,  # echoをTrueに設定すると、実際に実行されるSQLが出力され、デバッグが容易になります
    future=True,  # SQLAlchemy 2.0 APIを使用します。これは下位互換性があります
    pool_size=5,  # 接続プールのサイズはデフォルトで5です。0に設定すると、接続に制限はありません
    pool_recycle=3600  # データベースの自動切断を制限する時間を設定します
)

# メモリ内のSQLiteデータベースを作成します。check_same_thread=Falseを追加する必要があります。そうしないと、マルチスレッド環境で使用できません
engine = create_engine("sqlite:///:memory:", echo=True, future=True,
                       connect_args={"check_same_thread": False})

# MySQLに接続する別の方法
# pip install mysqlclient
engine = create_engine('mysql+mysqldb://user:password@localhost/foo?charset=utf8mb4')

Core層 — 直接SQLを使用する

CRUD

from sqlalchemy import text

with engine.connect() as conn:
    result = conn.execute(text("select * from users"))
    print(result.all())

# 結果は反復可能で、各行の結果はRowオブジェクトです
for row in result:
    # Rowオブジェクトは3つのアクセス方法をサポートします
    print(row.x, row.y)
    print(row[0], row[1])
    print(row["x"], row["y"])

# パラメータを渡すには、`:var`を使用します
result = conn.execute(
    text("SELECT x, y FROM some_table WHERE y > :y"),
    {"y": 2}
)

# パラメータを事前にコンパイルすることもできます
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)

# 挿入するときは、複数行を直接挿入することができます
conn.execute(
    text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
    [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
)

トランザクションとコミット

SQLAlchemyは2つのコミット方法を提供しています。1つは手動のcommit、もう1つは半自動のcommitです。公式ドキュメントではengine.begin()の使用が推奨されています。各行ごとにコミットする完全自動のautocommit方法もありますが、これは推奨されません。

# "commit as you go"は手動でコミットする必要があります
with engine.connect() as conn:
    conn.execute(text("CREATE TABLE some_table (x int, y int)"))
    conn.execute(
        text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
        [{"x": 1, "y": 1}, {"x": 2, "y": 4}]
    )
    conn.commit()  # ここでコミットすることに注意してください

# "begin once"半自動コミット
with engine.begin() as conn:
    conn.execute(
        text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
        [{"x": 6, "y": 8}, {"x": 9, "y": 10}]
    )

ORM

Session

Sessionはスレッドセーフではありません。しかし、一般的にWebフレームワークは各リクエストの開始時にsessionを取得するため、問題にはなりません。

from sqlalchemy.orm import Session

with Session(engine) as session:
    session.add(foo)
    session.commit()

# sessionmakerを使用してファクトリ関数を作成することもできます。これにより、毎回パラメータを入力する必要がなくなります
from sqlalchemy.orm import sessionmaker
new_session = sessionmaker(engine)

with new_session() as session:
    ...

Declarative API

  • __tablename__を使用してデータベースのテーブル名を指定します。
  • Mappedとネイティブ型を使用して各フィールドを宣言します。
  • IntegerStringなどを使用してフィールドの型を指定します。
  • indexパラメータを使用してインデックスを指定します。
  • uniqueパラメータを使用して一意のインデックスを指定します。
  • __table_args__を使用して他の属性を指定します。例えば、複合インデックスなどです。
from datetime import datetime
from sqlalchemy import Integer, String, func, UniqueConstraint
from sqlalchemy.orm import relationship, mapped_column, Mapped
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    # タプルである必要があり、リストではないことに注意してください
    __table_args__ = (UniqueConstraint("name", "time_created"),)
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = mapped_column(String(30), index=True)
    fullname: Mapped[str] = mapped_column(String, unique=True)
    # 特に大きなフィールドの場合、deferredを使用することもできます。これにより、このフィールドはデフォルトではロードされません
    description: Mapped[str] = mapped_column(Text, deferred=True)
    # デフォルト値。関数を渡すことに注意してください。現在の時刻ではありません
    time_created: Mapped[datetime] = mapped_column(DateTime(Timezone=True), default=datetime.now)
    # または、サーバーのデフォルト値を使用することもできます。ただし、これはテーブル作成時に設定する必要があり、テーブルのスキーマの一部になります
    time_created: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
    time_updated: Mapped[datetime] = mapped_column(DateTime(timezone=True), onupdate=func.now())


class Address(Base):
    __tablename__ = "address"
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    email_address: Mapped[str] = mapped_column(String, nullable=False)

# create_allを呼び出してすべてのモデルを作成します
Base.metadata.create_all(engine)

# 1つのモデルだけを作成する場合
User.__table__.create(engine)

外部キー

relationshipを使用してモデル間の関連関係を指定します。

1対多関係の双方向マッピング

from sqlalchemy import create_engine, Integer, String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, relationship, Session, Mapped, mapped_column

class Group(Base):
    __tablename__ = 'groups'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = mapped_column(String)
    # 対応する複数のユーザー。ここではモデル名をパラメータとして使用します
    members = relationship('User')

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    # group_idはデータベース内の実際の外部キー名で、2番目のフィールドForeignKeyは対応するIDを指定するために使用されます
    group_id = Column(Integer, ForeignKey('groups.id'))
    # モデル内の対応するグループフィールド。対応するモデル内のどのフィールドと重複するかを宣言する必要があります
    group = relationship('Group', overlaps="members")

多対多マッピングには、関連テーブルが必要です

# 関連テーブル
class UserPermissions(Base):
    __tablename__ = 'user_permissions'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    # 外部キーを使用して外部キーを指定することもできます
    user_id: Mapped[int] = mapped_column(Integer, ForeignKey('users.id'))
    permission_id: Mapped[str] = mapped_column(String, ForeignKey('permissions.id'))

class User(Base):
    __tablename__ = 'users'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = Column(String)
    # secondaryを使用して関連テーブルを指定し、overlapsを使用してモデル内の対応するフィールドを指定します
    permissions = relationship('Permission', secondary="user_permissions", overlaps="users")

class Permission(Base):
    __tablename__ = 'permissions'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = Column(String)
    # 上記と同じ
    users = relationship('User', secondary="user_permissions", overlaps="permissions")


user1 = User(name='user1', group_id=1)
user2 = User(name='user2')
group1 = Group(name='group1')
group2 = Group(name='group2', members=[user2])
permission1 = Permission(name="open_file")
permission2 = Permission(name="save_file")
user1.permissions.append(permission1)

db.add_all([user1, user2, group1, group2, permission1, permission2])

db.commit()

print(user1.permissions[0].id)

他の多くのチュートリアルでは、backrefを使用して対応するモデルの属性を生成しています。ここでは、対応するモデルでアクセス可能な属性を明示的に宣言することが好まれます。

CRUD

1.x APIとは異なり、2.0 APIではqueryは使用されず、selectを使用してデータをクエリします。

from sqlalchemy import select

# whereのパラメータは`==`で構成される式です。利点は、コードを書く際にスペルミスが検出されることです
stmt = select(User).where(User.name == "john").order_by(User.id)
# filter_byは**kwargsをパラメータとして使用します
stmt = select(User).filter_by(name="some_user")
# order_byはUser.id.desc()を使用して逆順ソートを表すこともできます

result = session.execute(stmt)

# 一般的に、オブジェクト全体を選択する場合、scalarsメソッドを使用する必要があります。そうしないと、1つのオブジェクトを含むタプルが返されます
for user in result.scalars():
    print(user.name)

# モデルの単一の属性をクエリする場合、scalarsを使用する必要はありません
result = session.execute(select(User.name))
for row in result:
    print(row.name)

# idでクエリするショートカットもあります
user = session.get(User, pk=1)

# データを更新するには、update文を使用する必要があります
from sqlalchemy import update
# synchronize_sessionには3つのオプションがあります:false、"fetch"、"evaluate"。デフォルトはevaluateです
# falseはPython内のオブジェクトをまったく更新しないことを意味します
# fetchはデータベースからオブジェクトを再ロードすることを意味します
# evaluateはデータベー
ベースを更新すると同時にPython内のオブジェクトにもできるだけ同じ操作を試みることを意味します
stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch")
session.execute(stmt)

# または、属性に直接値を割り当てることもできます
user.name = "John"
session.commit()

# ここには競合状態(レースコンディション)が発生する可能性がある箇所があります
# 間違い! 2つのプロセスが同時にこの値を更新すると、1つの値しか更新されない可能性があります。
# 両方が自分たちが正しいと思う値(2)を割り当てますが、実際の正しい値は1 + 1 + 1 = 3です
# 対応するSQL:Update users set visit_count = 2 where user.id = 1
user.visit_count += 1
# 正しいアプローチ:大文字のUに注意してください。つまり、モデルの属性を使用し、生成されるSQLはSQLサーバー側で1を加算するものです
# 対応するSQL:Update users set visit_count = visit_count + 1 where user.id = 1
user.visit_count = User.visit_count + 1

# オブジェクトを追加するには、直接session.addメソッドを使用します
session.add(user)
# またはadd_all
session.add_all([user1, user2, group1])

# 挿入されたIDを取得したい場合は、もちろんコミット後に読み取ることもできます
session.flush()  # flushはコミットではなく、トランザクションはまだコミットされていません。繰り返し読み取り可能である必要があり、これはデータベースの分離レベルに関係します
print(user.id)

# 削除するには、session.deleteを使用します
session.delete(user)

関連モデルの読み込み

N件のレコードのリストを読み込んだ後、それぞれの項目の具体的な値をデータベースから1つずつ読み込むと、N + 1回のクエリが発生します。これはデータベースで最も一般的な間違いであるN + 1問題です。

デフォルトでは、外部キー関連モデルはクエリで読み込まれません。selectinloadオプションを使用して外部キーを読み込むことで、N + 1問題を回避できます。

# 外部キーを読み込まない場合
session.execute(select(User)).scalars().all()
# 外部キーを読み込む場合
session.execute(select(User).options(selectinload(User.groups))).scalars().all()

Selectinloadの原理はselect inサブクエリを使用することです。selectinloadの他に、従来のjoinedloadも使用できます。その原理は最も一般的な結合テーブルです。

# joinedloadを使用して外部キーを読み込みます。2.0で指定されているuniqueメソッドを使用する必要があることに注意してください
session.execute(select(User).options(joinedload(User.groups))).unique().scalars().all()

2.0では、joinedloadではなくselectinloadの使用が推奨されます。一般的にselectinloadの方が良く、uniqueを使用する必要もありません。

外部キーの操作

SQLAlchemyでは、外部キーを配列のように直接操作することができます。

user.permissions.append(open_permission)  # 追加
user.permissions.remove(save_permission)  # 削除
# すべての外部キーをクリアする
user.permissions.clear()
user.permissions = []

JSONフィールドの特殊な扱い

現在、ほとんどのデータベースはJSONフィールドをサポートしています。SQLAlchemyでは、フィールドからJSONオブジェクトを直接読み取ったり、JSONオブジェクトを書き込んだりすることができます。ただし、このJSONオブジェクトに直接updateを実行してデータベースに書き戻すことを期待してはいけません。これは信頼できません。必ずコピー、読み書きを行ってから再割り当てしてください。

import copy
article = session.get(Article, 1)
tags = copy.copy(article.tags)
tags.append("iOS")
article.tags = tags
session.commit()

バッチ挿入

大量のデータを挿入する必要がある場合、1つずつ挿入する方法を使用すると、データベースとのやり取りに多くの時間が費やされ、効率が非常に低くなります。MySQLなどのほとんどのデータベースはinsert ... values (...), (...) ...というバッチ挿入APIを提供しており、これをSQLAlchemyでもうまく利用することができます。

# session.bulk_save_objects(...)を使用して複数のオブジェクトを直接挿入します
from sqlalchemy.orm import Session

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

# bulk_insert_mappingsを使用すると、オブジェクトを作成するオーバーヘッドを節約し、辞書を直接挿入することができます
users = [
    {"name": "u1"},
    {"name": "u2"},
    {"name": "u3"},
]
s.bulk_insert_mappings(User, users)
s.commit()

# bulk_update_mappingsを使用すると、オブジェクトをバッチで更新することができます。辞書内のidがwhere条件として使用され、他のすべてのフィールドが更新に使用されます
session.bulk_update_mappings(User, users)

DeclarativeBase

Pythonのネイティブ型システムを完全に受け入れる

from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
    pass
from sqlalchemy.orm import mapped_column, MappedColumn
id: Mapped[int] = mapped_column(Integer, primary_key=True)
fullname: Mapped[Optional[str]]

Asyncio

1タスクにつき1つのAsyncSessionAsyncSessionオブジェクトは、進行中の単一のステートフルなデータベーストランザクションを表す、可変でステートフルなオブジェクトです。asyncioを使用して並行タスクを実行する場合、例えばasyncio.gather()のようなAPIを使用する場合、各個別のタスクは別々のAsyncSessionを使用する必要があります。

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

engine = create_async_engine(url, echo=True)
session = async_sessionmaker(engine)

# オブジェクトを作成する
async with engine.begin() as conn:
       await conn.run_sync(Base.metadata.create_all)

# データを挿入する
async with session() as db:
    db.add(...)
    await db.commit()

# データをクエリする
async with session() as db:
    stmt = select(A)
    row = await db.execute(stmt)
    for obj in row.scalars():
        print(obj.id)

await engine.dispose()

マルチプロセス環境での使用

Pythonのグローバルインタープリターロック(GIL)のため、マルチコアプロセッサを活用するにはマルチプロセッシングを使用する必要があります。マルチプロセッシング環境では、リソースを共有することができません。SQLAlchemyに関しては、接続プールを共有することができません。この問題を手動で解決する必要があります。

一般的に、複数のプロセス間で同じSessionを共有しようとするのは避けるべきです。各プロセスの初期化時にSessionを作成するのが最善です。

値が設定されている場合のみwhere条件を追加する

URLでは、ユーザーが指定したオプションに応じて対応する結果を返す必要があることがよくあります。

query = select(User)
if username is not None:
    query = query.where(User.username == username)
if password is not None:
    query = query.where(User.password == password)

Leapcell: The Next - Gen Serverless Platform for Web Hosting, Async Tasks, and Redis

最後に、Pythonサービスをデプロイするのに最適なプラットフォームをおすすめします:Leapcell

barndpic.png

1. 多言語サポート

  • JavaScript、Python、Go、またはRustで開発できます。

2. 無制限のプロジェクトを無料でデプロイ

  • 使用量に応じて課金されます — リクエストがなければ料金はかかりません。

3. 比類のないコスト効率

  • 従量制課金で、アイドル時の料金はかかりません。
  • 例:平均応答時間60msで694万回のリクエストを$25でサポートします。

4. シンプルな開発者体験

  • 直感的なUIで簡単にセットアップできます。
  • 完全自動化されたCI/CDパイプラインとGitOps統合。
  • アクション可能な洞察を得るためのリアルタイムメトリクスとロギング。

5. 簡単なスケーラビリティと高いパフォーマンス

  • 高い同時接続を簡単に処理するための自動スケーリング。
  • 運用オーバーヘッドがゼロ — 構築に集中できます。

Frame3-withpadding2x.png

ドキュメントで詳細を探索しましょう!

Leapcell Twitter: https://x.com/LeapcellHQ

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?