はじめに
本記事はPython Advent Calendar 2025のシリーズ4 13日目の記事になります。
SQLGlotを使ってSQLクエリからCRUD図を作ります。
下記classmethodさんの記事とstackoverflowから、参照テーブルとCRUD操作が分かりそうです。
用意したケースを試しながら、クエリからCRUD図を生成する実装を作っていきます。
単クエリを対象としたいので、sqlglot.parse ではなく、sqlglot.parse_oneを用います。
結論を急ぎたい場合は「ここまでを全て包括した実装」まで飛ばしてください。
バージョン
- python: 3.13
- SQLGlot: 28.3.x
単純なクエリ
import sqlglot
from sqlglot import exp
def main(query):
ast = sqlglot.parse_one(query)
tables = ast.find_all(exp.Table)
print(type(ast), {t.name for t in tables})
main("SELECT * FROM user;") # <class 'sqlglot.expressions.Select'> {'user'}
main("INSERT INTO user (id, name) VALUES (1, 'aaa');") # <class 'sqlglot.expressions.Insert'> {'user'}
main("UPDATE user SET name = 'bbb' WHERE id = 1;") # <class 'sqlglot.expressions.Update'> {'user'}
main("DELETE FROM user WHERE id = 1;") # <class 'sqlglot.expressions.Delete'> {'user'}
Select
サブクエリ
特に問題なさそうですね。
import sqlglot
from sqlglot import exp
def main(query):
ast = sqlglot.parse_one(query)
tables = ast.find_all(exp.Table)
print(type(ast), {t.name for t in tables})
select_subquery = """SELECT
order.*
FROM
order
JOIN (
SELECT
id
FROM
product
WHERE
is_sale IS TRUE
) AS sale_products
ON order.product_id = sale_products.id;
"""
main(select_subquery) # <class 'sqlglot.expressions.Select'> {'product', 'order'}
with句
CTE句の名前も取れちゃっていますね……
import sqlglot
from sqlglot import exp
def main(query):
ast = sqlglot.parse_one(query)
tables = ast.find_all(exp.Table)
print(type(ast), {t.name for t in tables})
with_select_query = """WITH sale_products AS (
SELECT
id
FROM
product
WHERE
is_sale is True
)
SELECT
order.*
FROM
order
JOIN sale_products ON order.product_id = sale_products.id;
"""
main(with_select_query) # <class 'sqlglot.expressions.Select'> {'order', 'cte_sale_products', 'product'}
sqlglot#metadataより、 find_all を使えばテーブル以外も抽出できそう。
あらかじめwith句のエイリアス名を出しておいて、除くことで対処します。
exp.Table のCTE句版を探したら2つ見つかりました。
わからないのでどっちも試します。
exp.CTE を用いて、 alias_or_name プロパティを使うと良さそうです。
import sqlglot
from sqlglot import exp
def main(query):
ast = sqlglot.parse_one(query)
print([cte_expr.name for cte_expr in ast.find_all(exp.CTE)])
# ['']
print([with_expr.name for with_expr in ast.find_all(exp.With)])
# ['']
print([cte_expr.alias_or_name for cte_expr in ast.find_all(exp.CTE)])
# ['cte_sale_products']
print([with_expr.alias_or_name for with_expr in ast.find_all(exp.With)])
# ['']
with_select_query = """WITH cte_sale_products AS (
SELECT
id
FROM
product
WHERE
is_sale is True
)
SELECT
order.*
FROM
order
JOIN cte_sale_products ON order.product_id = cte_sale_products.id;
"""
main(with_select_query)
exp.expression というのもありました。これを使うなら exp.With で取れる。
import sqlglot
from sqlglot import exp
def main(query):
ast = sqlglot.parse_one(query)
for cte_expr in ast.find_all(exp.With):
print([cte.alias for cte in cte_expr.expressions])
# ['cte_sale_products']
with_select_query = """WITH cte_sale_products AS (
SELECT
id
FROM
product
WHERE
is_sale is True
)
SELECT
order.*
FROM
order
JOIN cte_sale_products ON order.product_id = cte_sale_products.id;
"""
main(with_select_query)
しかしドキュメントを読んでも expression が何なのかよくわからないので見てみます。
import sqlglot
from sqlglot import exp
def main(query):
ast = sqlglot.parse_one(query)
for cte_expr in ast.find_all(exp.With):
print([cte for cte in cte_expr.expressions])
# [
# CTE(
# this=Select(
# expressions=[
# Column(
# this=Identifier(this=id, quoted=False)
# )
# ],
# from_=From(
# this=Table(
# this=Identifier(this=product, quoted=False)
# )
# ),
# where=Where(
# this=Is(
# this=Column(
# this=Identifier(this=is_sale, quoted=False)
# ),
# expression=Boolean(this=True)
# )
# )
# ),
# alias=TableAlias(
# this=Identifier(this=cte_sale_products, quoted=False)
# )
# )
# ]
for cte_expr in ast.find_all(exp.CTE):
print([cte for cte in cte_expr.expressions])
# []
with_select_query = """WITH cte_sale_products AS (
SELECT
id
FROM
product
WHERE
is_sale is True
)
SELECT
order.*
FROM
order
JOIN cte_sale_products ON order.product_id = cte_sale_products.id;
"""
main(with_select_query)
なるほど。
構文解析結果のオブジェクトっぽい。
深掘りはここまでにして、やりたいことに戻ります。
あらかじめ出したwith句のエイリアス名を除くことで対処できました。
import sqlglot
from sqlglot import exp
def main(query):
ast = sqlglot.parse_one(query)
cte_names = {cte_expr.alias_or_name for cte_expr in ast.find_all(exp.CTE)}
tables = ast.find_all(exp.Table)
print(type(ast), {t.name for t in tables if t.name not in cte_names})
with_select_query = """WITH cte_sale_products AS (
SELECT
id
FROM
product
WHERE
is_sale is True
)
SELECT
order.*
FROM
order
JOIN cte_sale_products ON order.product_id = cte_sale_products.id;
"""
main(with_select_query) # <class 'sqlglot.expressions.Select'> {'product', 'order'}
Insert
selectの取得結果からインサートするクエリ
型はinsertになっているけど、操作ごとにテーブルを分けていく必要があります。
他更新処理・削除処理をクエリには含められなくて、insert対象は1テーブルなので、対象テーブルとその他取得テーブルで分けて取れればできそうです。
import sqlglot
from sqlglot import exp
def main(query):
ast = sqlglot.parse_one(query)
cte_names = {cte_expr.alias_or_name for cte_expr in ast.find_all(exp.CTE)}
tables = ast.find_all(exp.Table)
print(type(ast), {t.name for t in tables if t.name not in cte_names})
qs = """INSERT INTO
order (user_id, product_id)
(
SELECT
user_id,
product_id
FROM
order_history
WHERE
id = 101 AND
user_id = 1
);
"""
main(qs) # <class 'sqlglot.expressions.Insert'> {'order', 'order_history'}
こうすることで上手くいきました。
insertでは、thisを取るとSchemaで返ってくる場合があるので、Schemaに対してもう一度thisを取ってあげることでTableが取れます。
サブクエリの型がSubqueryで取れていますが、selectと解釈しても問題ないでしょう。
import sqlglot
from sqlglot import exp
def main(query):
ast = sqlglot.parse_one(query)
cte_names = {cte_expr.alias_or_name for cte_expr in ast.find_all(exp.CTE)}
target = ast.this
if isinstance(target, exp.Schema):
target = target.this
if isinstance(target, exp.Table):
name = target.name
print(type(ast), name) # <class 'sqlglot.expressions.Insert'> order
tables = ast.expression.find_all(exp.Table)
print(type(ast.expression), {t.name for t in tables if t.name not in cte_names}) # <class 'sqlglot.expressions.Subquery'> {'order_history'}
qs = """INSERT INTO
order (user_id, product_id)
(
SELECT
user_id,
product_id
FROM
order_history
WHERE
id = 101 AND
user_id = 1
);
"""
main(qs)
selectの取得結果からインサートするクエリ(with句付き)
上記のwith句版です。
select対象を ast.expression で取っているため、with句内のテーブルが取ってこれません。
そこで、上記で深掘りした ast.find_all(exp.With) の出番です。
select側の型はもう exp.Select で決め打ちます。
import sqlglot
from sqlglot import exp
def main(query):
ast = sqlglot.parse_one(query)
cte_names = {cte_expr.alias_or_name for cte_expr in ast.find_all(exp.CTE)}
target = ast.this
if isinstance(target, exp.Schema):
target = target.this
if isinstance(target, exp.Table):
name = target.name
print(type(ast), name) # <class 'sqlglot.expressions.Insert'> order
select_table = set()
for with_expr in ast.find_all(exp.With):
for cte in with_expr.expressions:
select_table = select_table | {expr.name for expr in cte.find_all(exp.Table)}
tables = ast.expression.find_all(exp.Table)
select_table = select_table | {t.name for t in tables if t.name not in cte_names}
print(exp.Select, select_table) # <class 'sqlglot.expressions.Select'> {'order_history'}
qs = """WITH cte_order_history AS (
SELECT
user_id,
product_id
FROM
order_history
WHERE
id = 101 AND
user_id = 1
)
INSERT INTO
order (user_id, product_id)
(
SELECT
user_id,
product_id
FROM
cte_order_history
);
"""
main(qs)
Update, Delete
更新・削除対象のテーブルもselectしたものとして解釈します。
下記のようなケースを考えたとき、分岐が面倒なのでreadとしても捉えることにしたのが本音です。
UPDATE
customers
JOIN
orders ON customers.customer_id = orders.customer_id
SET
customers.status = orders.order_status
WHERE
customers.status = 'Active';
しかし敢えて理由をつけるとするならば、
往々に絞り込んで更新・削除する用途かと考えていて、絞り込み条件に含めるということは取得しているとも解釈できます。
そのためreadにもチェックをつけるようにしました。
実装
更新、削除どちらも下記実装で共通です。
下記実装は不完全です。
用意したケースが全て通る実装は後述します。
import sqlglot
from sqlglot import exp
def main(query):
ast = sqlglot.parse_one(query)
cte_names = {cte_expr.alias_or_name for cte_expr in ast.find_all(exp.CTE)}
target = ast.this
if isinstance(target, exp.Table):
name = target.name
print(type(ast), name)
tables = ast.find_all(exp.Table)
print(exp.Select, {t.name for t in tables if t.name not in cte_names})
すべて
更新
クエリ
UPDATE users u
SET u.name = 'aaa';
結果
# <class 'sqlglot.expressions.Update'> users
# <class 'sqlglot.expressions.Select'> {'users'}
削除
クエリ
DELETE FROM user;
結果
# <class 'sqlglot.expressions.Delete'> user
# <class 'sqlglot.expressions.Select'> {'user'}
絞り込み
更新
クエリ
UPDATE users u
SET u.name = 'aaa'
WHERE u.id = 1;
結果
# <class 'sqlglot.expressions.Update'> users
# <class 'sqlglot.expressions.Select'> {'users'}
削除
クエリ
DELETE FROM user u WHERE u.id = 1;
結果
# <class 'sqlglot.expressions.Delete'> user
# <class 'sqlglot.expressions.Select'> {'user'}
Joinで絞る
更新
クエリ
UPDATE
customers
JOIN
orders ON customers.customer_id = orders.customer_id
SET
customers.status = orders.order_status
WHERE
customers.status = 'Active';
結果
# <class 'sqlglot.expressions.Update'> customers
# <class 'sqlglot.expressions.Select'> {'customers', 'orders'}
削除
クエリ
DELETE FROM
customers
JOIN
orders ON customers.customer_id = orders.customer_id
WHERE
customers.status = 'Active';
結果
# <class 'sqlglot.expressions.Delete'> customers
# <class 'sqlglot.expressions.Select'> {'customers', 'orders'}
サブクエリで絞り込む
更新
クエリ
UPDATE
users u
SET u.last_ordered_at = (
SELECT MAX(o.ordered_at)
FROM orders o
WHERE o.user_id = u.id
);
結果
# <class 'sqlglot.expressions.Update'> users
# <class 'sqlglot.expressions.Select'> {'orders', 'users'}
削除
クエリ
DELETE FROM
users u
WHERE
u.status = 'inactive'
AND u.created_at < CURRENT_DATE - INTERVAL '365 days'
AND NOT EXISTS (
SELECT
1
FROM
orders o
WHERE
o.user_id = u.id
AND o.ordered_at >= CURRENT_DATE - INTERVAL '365 days'
);
結果
# <class 'sqlglot.expressions.Delete'> users
# <class 'sqlglot.expressions.Select'> {'orders', 'users'}
サブクエリ内にJoinが含まれる絞り込みでの更新
クエリ
UPDATE
users u
JOIN (
SELECT
o.user_id,
SUM(oi.quantity * oi.unit_price) AS total_amount
FROM
orders o
JOIN order_items oi ON oi.order_id = o.id
WHERE
o.status = 'completed'
GROUP BY
o.user_id
HAVING
SUM(oi.quantity * oi.unit_price) >= 100000
) t ON t.user_id = u.id
SET
u.rank = 'VIP'
WHERE
u.rank <> 'VIP';
結果
# <class 'sqlglot.expressions.Update'> users
# <class 'sqlglot.expressions.Select'> {'users', 'orders', 'order_items'}
With句の結果でJoinして絞り込んで更新
クエリ
WITH order_total AS (
SELECT
oh.order_id,
SUM(oi.quantity * oi.price) AS total_amount
FROM
order_history oh
JOIN order_items oi
ON oh.order_id = oi.order_id
GROUP BY
oh.order_id
)
UPDATE order_history AS oh
JOIN order_total AS ot
ON oh.order_id = ot.order_id
SET
oh.total_amount = ot.total_amount;
結果
# <class 'sqlglot.expressions.Update'> order_history
# <class 'sqlglot.expressions.Select'> {'order_items', 'order_history'}
joinした結果をDelete
下記のようなjoinした結果を削除するクエリでは、削除対象テーブルのエイリアスが含まれます。
エイリアスなし
クエリ
DELETE order_items
FROM
order_items
JOIN orders ON order_items.order_id = orders.id
WHERE
orders.status = 'canceled'
AND orders.ordered_at < CURRENT_DATE - INTERVAL 30 DAY;
結果
# <class 'sqlglot.expressions.Delete'> order_items
# <class 'sqlglot.expressions.Select'> {'order_items', 'orders'}
エイリアスあり
oi が取得対象に含まれてしまっています。
クエリ
DELETE oi
FROM
order_items oi
JOIN orders o ON oi.order_id = o.id
WHERE
o.status = 'canceled'
AND o.ordered_at < CURRENT_DATE - INTERVAL 30 DAY;
結果
# <class 'sqlglot.expressions.Delete'> order_items
# <class 'sqlglot.expressions.Select'> {'order_items', 'orders', 'oi'}
実装の変更
なので、一度エイリアスのマップを作成し、エイリアスがあれば置き換えるようにします。
これで想定通りになりました! (もちろん、ここまでのクエリもすべて想定通りに通ります)
def main(query):
ast = sqlglot.parse_one(query)
cte_names = {cte_expr.alias_or_name for cte_expr in ast.find_all(exp.CTE)}
target = ast.this
if isinstance(target, exp.Table):
name = target.name
print(type(ast), name)
tables = [t for t in ast.find_all(exp.Table) if t.name not in cte_names]
alias_table_map = {t.alias: t.name for t in tables if t.alias}
print(exp.Select, {alias_table_map.get(t.name, t.name) for t in tables})
qs = """DELETE oi
FROM
order_items oi
JOIN orders o ON oi.order_id = o.id
WHERE
o.status = 'canceled'
AND o.ordered_at < CURRENT_DATE - INTERVAL 30 DAY;"""
main(qs)
# <class 'sqlglot.expressions.Delete'> order_items
# <class 'sqlglot.expressions.Select'> {'order_items', 'orders'}
ここまでを全て包括した実装
ここまでを包括して、CRUDごとにテーブルを振り分ける実装を下記に載せます。
from collections import defaultdict
import sqlglot
from sqlglot import exp
class ExtractTableOperate:
CRUD_CREATE = "C"
CRUD_READ = "R"
CRUD_UPDATE = "U"
CRUD_DELETE = "D"
def __init__(self, dialect = "mysql"):
self._crud_map: dict[str, set[str]] = defaultdict(set)
self._cte_names: set[str] = set()
self.dialect = dialect
def extract(self, query: str):
self._reset_props()
ast = sqlglot.parse_one(query, read=self.dialect)
self._cte_names = self._cte_names | {cte_expr.alias_or_name for cte_expr in ast.find_all(exp.CTE)}
if isinstance(ast, exp.Insert):
self._handle_insert(ast)
elif isinstance(ast, exp.Update):
self._handle_update(ast)
elif isinstance(ast, exp.Delete):
self._handle_delete(ast)
elif isinstance(ast, exp.Select):
self._handle_select(ast)
return self._crud_map
def _reset_props(self):
self._crud_map = defaultdict(set)
self._cte_names = set()
def _mark_read_tables(
self,
expr: exp.Expression,
):
tables = [
t
for t in expr.find_all(exp.Table)
if (
t.name and
# CTE名を除去
t.name not in self._cte_names
)
]
# エイリアスの置き換え
alias_table_map = {t.alias: t.name for t in tables if t.alias}
for t in tables:
name = t.name
self._crud_map[alias_table_map.get(name, name)].add(self.CRUD_READ)
def _handle_select(self, stmt: exp.Select | exp.Subquery):
self._mark_read_tables(stmt)
def _handle_insert(self, stmt: exp.Insert):
target = stmt.this
if isinstance(target, exp.Schema):
target = target.this
if isinstance(target, exp.Table):
name = target.name
if name and name not in self._cte_names:
self._crud_map[name].add(self.CRUD_CREATE)
# INSERT INTO ... SELECT ... のREAD部分
if stmt.expression is not None:
self._handle_select(stmt.expression)
# with句内
for with_expr in stmt.find_all(exp.With):
for cte in with_expr.expressions:
self._handle_select(cte)
def _handle_update(self, stmt: exp.Update):
target = stmt.this
if isinstance(target, exp.Table):
name = target.name
if name and name not in self._cte_names:
self._crud_map[name].add(self.CRUD_UPDATE)
self._mark_read_tables(stmt)
def _handle_delete(self, stmt: exp.Delete):
target = stmt.this
if isinstance(target, exp.Table):
name = target.name
if name and name not in self._cte_names:
self._crud_map[name].add(self.CRUD_DELETE)
self._mark_read_tables(stmt)
テスト
ここまでのケースもテストで通るようにします。
import pytest
@pytest.mark.parametrize(
["sql", "expected"],
[
(
"SELECT * FROM user;",
{
"user": {"R"}
},
),
(
"INSERT INTO user (id, name) VALUES (1, 'aaa');",
{
"user": {"C"}
},
),
(
"UPDATE user SET name = 'aaa';",
{
"user": {"R", "U"}
},
),
(
"UPDATE user SET name = 'bbb' WHERE id = 1;",
{
"user": {"R", "U"}
},
),
(
"DELETE FROM user;",
{
"user": {"R", "D"}
},
),
(
"DELETE FROM user WHERE id = 1;",
{
"user": {"R", "D"}
},
),
(
"""SELECT
order.*
FROM
order
JOIN (
SELECT
id
FROM
product
WHERE
is_sale IS TRUE
) AS sale_products
ON order.product_id = sale_products.id;
""",
{
"product": {"R"},
"order": {"R"},
},
),
(
"""WITH sale_products AS (
SELECT
id
FROM
product
WHERE
is_sale is True
)
SELECT
order.*
FROM
order
JOIN sale_products ON order.product_id = sale_products.id;
""",
{
"product": {"R"},
"order": {"R"},
},
),
(
"""INSERT INTO
order (user_id, product_id)
(
SELECT
user_id,
product_id
FROM
order_history
WHERE
id = 101 AND
user_id = 1
);
""",
{
"order": {"C"},
"order_history": {"R"},
},
),
(
"""WITH cte_order_history AS (
SELECT
user_id,
product_id
FROM
order_history
WHERE
id = 101 AND
user_id = 1
)
INSERT INTO
order (user_id, product_id)
(
SELECT
user_id,
product_id
FROM
cte_order_history
);
""",
{
"order": {"C"},
"order_history": {"R"},
},
),
(
"""UPDATE
customer
JOIN
order ON customer.customer_id = order.customer_id
SET
customer.status = order.order_status
WHERE
customer.status = 'Active';
""",
{
"customer": {"R", "U"},
"order": {"R"},
},
),
(
"""DELETE FROM
customer
JOIN
order ON customer.customer_id = order.customer_id
WHERE
customer.status = 'Active';
""",
{
"customer": {"R", "D"},
"order": {"R"},
},
),
(
"""UPDATE
user u
SET u.last_ordered_at = (
SELECT MAX(o.ordered_at)
FROM order o
WHERE o.user_id = u.id
);
""",
{
"user": {"R", "U"},
"order": {"R"},
},
),
(
"""DELETE FROM
user u
WHERE
u.status = 'inactive'
AND u.created_at < CURRENT_DATE - INTERVAL '365 days'
AND NOT EXISTS (
SELECT
1
FROM
order o
WHERE
o.user_id = u.id
AND o.ordered_at >= CURRENT_DATE - INTERVAL '365 days'
);
""",
{
"user": {"R", "D"},
"order": {"R"},
},
),
(
"""UPDATE
user u
JOIN (
SELECT
o.user_id,
SUM(oi.quantity * oi.unit_price) AS total_amount
FROM
order o
JOIN order_item oi ON oi.order_id = o.id
WHERE
o.status = 'completed'
GROUP BY
o.user_id
HAVING
SUM(oi.quantity * oi.unit_price) >= 100000
) t ON t.user_id = u.id
SET
u.rank = 'VIP'
WHERE
u.rank <> 'VIP';
""",
{
"user": {"R", "U"},
"order": {"R"},
"order_item": {"R"},
},
),
(
"""WITH order_total AS (
SELECT
oh.order_id,
SUM(oi.quantity * oi.price) AS total_amount
FROM
order_history oh
JOIN order_item oi
ON oh.order_id = oi.order_id
GROUP BY
oh.order_id
)
UPDATE order_history AS oh
JOIN order_total AS ot
ON oh.order_id = ot.order_id
SET
oh.total_amount = ot.total_amount;
""",
{
"order_history": {"R", "U"},
"order_item": {"R"},
},
),
(
"""DELETE oi
FROM
order_item oi
JOIN order o ON oi.order_id = o.id
WHERE
o.status = 'canceled'
AND o.ordered_at < CURRENT_DATE - INTERVAL 30 DAY;
""",
{
"order": {"R"},
"order_item": {"R", "D"},
},
),
]
)
def test_extract_table_operate(sql, expected):
eto = ExtractTableOperate()
crud_map = eto.extract(sql)
assert crud_map == expected
発展
下記記事と組み合わせることで、エンドポイントごとのCRUD図を作成できます。
例えば、Google spreadsheetを組み合わせてテストをすべて流すCIに組み込めば、マージタイミングなどでCRUD図が自動生成されるDevOpsとなる案が考えられます。
おわりに
これで基本的なSQLのクエリからCRUD図を作成することができました。
本記事では テーブル: {CRUD} 形式のdictでの返却に留めているため、ここからさらに加工するなりして扱ってください。
以上、Python Advent Calendar 2025のシリーズ4 13日目でした。
最後まで読んでいただきありがとうございました。