はじめに
SQLAlchemy はPythonにおけるORマッパーとして広く使われているライブラリです。柔軟なクエリを書けるため、Pythonをバックエンドに利用する際にはその記法を覚えておくと重宝します。
また SQLModelはSQLAlchemyのラッパーライブラリで、pydanticのBaseModelと同様の形式でデータモデルが作れるほか、導入のドキュメントが非常にわかりやすく、SQLAlchemy単体で利用するよりも簡単にデータベースとの連携が実現できます。
一方でSQLModelだけでは足りない部分も多いため、SQLAlchemyのAPIを利用せざるを得ないケースは多いですが、非常に多機能が故にドキュメントの情報量が多く、書きたいクエリを実現するための情報にたどり着くのが難しい場合があります。
そのため今回はPostgreSQLの基本的なクエリを取り上げ、SQLAlchemyの記法でそれを実現する方法をいくつか紹介します。
環境
- Python 3.11.4
- SQLAlchemy 1.4.41
- PostgreSQL 15.3
- sqlmodel 0.0.8
前提
SQLAlchemyをORMとして利用する際には session.query
を利用するのが一般的ですが、今回は sqlalchemy.select
関数を利用します。以下のように使います。
from sqlalchemy import select
stmt = (
select(
User.name.label("user_name"),
User.age.label("user_age"),
Group.name.lable("group_name")
)
.select_from(User)
.join(Group, Group.group_id == User.group_id)
.filter(
User.age >= 18
)
)
users = session.execute(stmt).all()
-
select
の引数にはアクセスしたいフィールドを書きます。-
label
を利用すると、クエリ結果のフィールド名を上書きすることができます。
-
-
select_from
はFROM句に対応します。アクセスしたいテーブルのモデルクラスを与えます。 -
join
はJOIN句に対応します。第二引数に結合の条件を与えます。 -
filter
はWHERE句に対応します。抽出条件を与えます。
以上のように selectを使ってクエリを組み立てていきますが、次節以降で PostgreSQLの基本的な構文を取り上げ、それをSQLAlchemyにて実装していきます。
型のキャスト
テーブルに定義されている型を変換して返したいケースは多々あると思います。例として以下のようなテーブルを考えます。
class User(SQLModel, table=True):
__tablename__ = "user"
user_id: int | None = Field(default=None, primary_key=True)
name: str = Field(nullable=False, unique=True)
created_at: datetime | None = Field(
sa_column=sa.Column(
sa.TIMESTAMP(timezone=True), nullable=True, default=sa.func.now()
)
)
create_at
はデータベース上では timestamp with timezone
型として定義されます。
この created_at
において、日付部分だけを取得する、つまり datetime型をdate型に変換して返すケースを考えます。
SQLの場合
SELECT
user.created_at::DATE
FROM
user;
SQLAlchemyの場合
from sqlalchemy.dialects.postgresql import DATE
stmt = (
select(
User.created_at.cast(DATE)
)
.select_from(User)
)
上記のように、フィールドに cast
関数を適用することで型変換が可能です。castの引数には SQLAlchemyの型変数を与えます。今回は dialectsモジュールの DATEを利用していますが、ジェネリックな型変数でもキャスト可能です。
RANGE型を使う
postgresに用意されている RANGE型は、その名のとおり値の範囲を定義する型です。RANGEの派生である DATERANGE
型は、例えばなにかの有効期限を範囲としてデータに持たせたいときなどに便利です。
RANGE型へのキャスト
さきほどの userテーブルにおける created_atをDATERANGEにキャストしてみます。(始点をcreated_at, 終点を本日の日付とします。)
SQLの場合
SELECT
DATERANGE (
user.created_at::DATE,
CURRENT_DATE,
'[]'
) AS effective
FROM ...
DATERANGE関数を使っています。第三引数は、範囲の端点の扱いを定義します。(
は開区間を、[
は閉区間を表します。
SQLAlchemyの場合
from sqlalchemy import func
stmt = (
select(
func.daterange(
user.created_at.cast(DATE)
func.current_date(),
"[]",
).label("effective"),
)
)
sqlalchemy.func
から daterange関数を呼び出すことができ、その使い勝手はSQLの場合とほぼ変わりません。
範囲演算子を利用してフィルタする
postgresの範囲演算子を利用すると、例えば「ある範囲が値を含む」「ある範囲と別の範囲で共通領域をもつ」といった条件で検索結果をフィルタリングできます。
SQLの場合
DATERANGEどうしで共通領域をもつデータのみを取り出すクエリを書いてみます。共通領域の有無を判定する演算子は &&
です。
SELECT * FROM some_table
WHERE
some_table.a_daterange && b_daterange;
SQLAlchemyの場合
stmt = (
select(SomeTable)
.select_from(SomeTable)
.filter(
SomeTable.c.a_daterange.op("&&")(
fuct.daterange(...)
)
)
)
範囲型をもつフィールドに対し、op
というメソッドを呼び出すことができます。opの引数は演算子の記号です。
ウィンドウ関数を使う
累積和をとるときなど、ウィンドウ関数を使うのが便利です。以下のようなテーブルを例に取ります。
class Sales(SQLModel, table=True):
__tablename__ = "sales"
id: int | None = Field(None, primary_key=True)
product_id: int = Field(foreign_key="...", nullable=False)
count: int = Field(..., nullable=False)
client_id: int = Field(...)
sold_at: datetime = Field(...)
あるプロダクトが売れた個数を格納するテーブルです。このテーブルに対し、プロダクトIDごとに累積和を求めてみます。
SQLの場合
SELECT
sales.id,
sales.count,
SUM(sales.count) OVER (
PARTITION BY sales.procuct_id
ORDER BY sales.sold_at
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS cumulative_count
FROM
processing_part;
ORDER BY
に指定したフィールドでソートされ、PARTITION BYで区切られたプロダクトIDごとに累積和が求められます。ROWS BETWEEN
はフレームサイズの定義です。ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
で1つ前の行~現在の行になります。
SQLAlchemyの場合
stmt = (
select(
Sales.id,
Sales.count,
func.sum(Sales.count).over(
partition_by=Sales.product_id,
order_by=[Order.deadline, Process.estimate],
rows=(None, 0),
).label("cumulative_count")
)
.select_from(Sales)
)
こちらも funcを使って、sum関数を呼び出します。over
の引数は公式ドキュメントに詳しく記載してあります。
JSON関数を使う
GROUP BYでまとめられたデータの集合をJSON配列として返したいときには、JSON関数を使うと便利です。さきほどのSalesテーブルにおいて、プロダクトIDごとにまとめてJSONにしてみます。
SQLの場合
SELECT
sales.product_id,
JSON_AGG (
JSON_BUILD_OBJECT (
'id',
sales.id,
'count',
sales.count,
'client_id',
sales.client_id
)
ORDER BY
sales.sold_at
) AS records
FROM
sales
GROUP BY
sales.product_id;
JSON_BUILD_OBJECT
でjson_objectを定義し、JSON_AGG
でそれを配列形式で集約します。また、ORDER BY
は集約時のソート順を決めます。この設定により、json配列内のオブジェクトがsold_atの小さい順で配列に格納されます。
SQLAlchemyの場合
from sqlalchemy.dialects.postgresql import aggregate_order_by
stmt = (
select(
Sales.product_id,
func.json_agg(
aggregate_order_by(
func.json_build_object(
'id',
Sales.id,
'count',
Sales.count,
'client_id',
Sales.client_id
),
*[Sales.sold_at],
),
)
)
.select_from(Sales)
.group_by(Sales.product_id)
)
こちらの今までと同様に funcを利用し、各関数にアクセスすることで同じ記法でクエリを書くことができます。配列内のオブジェクトを任意の順序でソートするためには、postgresのdialectモジュールから aggregate_order_by
を使用する必要があります。こちらの第二引数にソートの条件を渡してください。
おわりに
上記のようにいくつかのパターンでPostgresの基本的なクエリをSQLAlchemyで実現する方法を紹介しました。基本的には sqlalchemy.func
から対象の関数を呼び出せば、同じ記法が利用できると考えて良さそうです。こちらの記事が少しでも役立てば幸いです。