表題の作業を実際に行ってみました。問題点や、その一部の解消方法をメモしました。
はじめに
SQLMesh とは
SQLMeshは「dbt に代わるデータ変換ツール」として注目されています。
Tobiko Data社を中心にOSSとして開発されているほか、Tobiko Cloudとしてマネージドサービスも提供しています。
SQLMeshの主な特徴は以下の通りです。
参考: Tobiko - dbt tag
virtual data environments
開発環境、検証環境などを view の切り替えを使って、実体データの複製を最小限に止める仕組みです。
(Copy-On-Writeに似た、「変更が生じたテーブルのみ新しく作成し、その他は既存のテーブルを参照させる方法」をとっているようです)
開発環境で動作確認したものを本番環境に適用する場合や、切り戻しをする場合も、view の参照先を切り替えるだけなので、短時間で行えます。
「本物のデータを使わないと検証にならない」というのはよくある状況だと思いますが、モデルの開発者には個人情報へのアクセスを制限する必要があるなど、セキュリティやガバナンス観点で環境分離を行なっている場合も多いと思います。
virtual data environment による環境分離だけでモデル開発を進めるかどうか、組織のポリシー次第になるのではないでしょうか。
SQLMesh でも gateway という接続先を分離することで複数の環境を分けることも可能で、その辺りの手順がこちらのページに解説されています。
Isolated systems guide - SQLMesh
column-level lineage
SQLGlot というライブラリを内部で利用しており、クエリの内容をAST(抽象構文木)レベルで解釈できるので、カラムレベルでの系統情報を扱うことができます。
また、モデルの変更を行う際に、その変更が下流のモデルに影響するのかどうかをシステムが推定し、必要なものだけを再作成することができます。下流のモデルをすべて再作成するのに比べて、計算のコストを低減することができます。
state awareness
dbt の incremental なマテリアライズ方式では、「既存のデータに含まれる最新の日時以降に、上流で増加したレコード」を取り込むことができます。SQLMeshではモデルごとに処理対象の時間間隔や、実行の頻度を指定し(interval approach)、「Modelでどの時間区分を処理したか」というstateを追跡管理します。
差分更新のモデルであっても、過去のデータを部分的に再処理したり、最初に大きなデータを取り込む場合にも分割して処理することができます。
dlt とは
検索すると、Databricks の Delta Live Tables の結果が混じってくるので厄介ですが、"data load tool" の略称で、データロードを行うオープンソースのPythonライブラリです。
「Pythonのライブラリ」なので、Notebook上でも動かせます。また、スクリプトを書けば、さまざまなデータソースを簡単に取り込むことができます。
データの構造や型から、ロードするテーブルのスキーマを自動的に作成してくれます。
dlt でのデータロード
下記のようなサンプルのデータロードスクリプトを用意しました。
duckdb と snowflake の両方に、同じデータをロードします。
import dlt
DUCKDB_PATH = "./data/dialect_check_data.duckdb"
pipeline_snow = dlt.pipeline(
pipeline_name="dialect_check_snowflake",
destination="snowflake",
dataset_name="dia_check_raw",
dev_mode=False,
)
duckdb_destination = dlt.destinations.duckdb(
DUCKDB_PATH,
enable_dataset_name_normalization=False)
#breakpoint()
pipeline_duck = dlt.pipeline(
pipeline_name="dialect_check_duckdb",
destination=duckdb_destination,
dataset_name="dia_check_raw",
dev_mode=False,
)
data = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
if __name__ == "__main__":
for pipeline in [pipeline_snow, pipeline_duck]:
#for pipeline in [pipeline_snow, ]:
load_info = pipeline.run(
data,
table_name="items",
write_disposition={
"disposition": "append",
},
refresh="drop_resources"
)
print(load_info)
dlt の接続情報は、project_root/.dlt/secrets.toml
に保存します。
この結果、duckdb 側では dataset name である dia_check_raw
が小文字のままカタログ名として利用されます。
一方、snowflake 側では 大文字の DIA_CHECK_RAW
がスキーマ名として利用されます。
SQLMesh では、sqlglotを利用して、1つの方言(dialect)で書かれたModelファイルで複数のDWHを管理することができる 1 設計になっているのですが、Modelで読みに行く先のカタログ名/スキーマ名のCaseが異なる、という事象に対応する必要が生じます。
SQLMesh によるモデルの構築
パイプライン名を確認します
% pdm run dlt pipeline --list-pipelines
11 pipelines found in /Users/m-nakamura/.dlt/pipelines
dialect_check_snowflake
dialect_check_duckdb
...
パイプライン名を指定して、SQLMesh の modelを自動生成することができます。
参考: dlt - SQLMesh
以下の例は、raw_csv_import_dev
という dlt pipeline をもとに、duckdb dialect の SQLMesh モデルを生成するコマンドです。
% pdm run sqlmesh init -t dlt --dlt-pipeline raw_csv_import_dev duckdb
以下のようなファイルが生成されます
.
├── audits
├── config.yaml
├── macros
├── models
│ ├── incremental__dlt_loads.sql
│ └── incremental_items.sql
├── seeds
└── tests
(incremental__dlt_loads.sql は dlt で利用するメタデータなので不要かもしれません)
incremental_items.sql は以下のような内容です。
kind が差分更新になっています。
MODEL (
name dia_check_raw_sqlmesh.incremental_items,
kind INCREMENTAL_BY_TIME_RANGE (
time_column _dlt_load_time,
),
);
SELECT
CAST(c.id AS BIGINT) AS id,
CAST(c.name AS TEXT) AS name,
CAST(c._dlt_load_id AS TEXT) AS _dlt_load_id,
CAST(c._dlt_id AS TEXT) AS _dlt_id,
TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) as _dlt_load_time
FROM
dia_check_raw.items as c
WHERE
TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) BETWEEN @start_ds AND @end_ds
SQLMesh で利用する接続の情報は、project_root/config.yaml
に保存します。
SQLMesh では、複数の独立したシステム(本番環境 / 検証環境 のようなもの)に対応することができます。「仮想環境」という用語と混乱を招く恐れがあるため、SQLMeshのドキュメントの中では 'isolated system' と呼ばれています。
Isolated systems guide - SQLMesh
具体的には、config.yaml
のなかに複数の gateway
を定義し、切り替えて利用することができます。
作成する Model には SQL の dialect を設定することができ、gateway で指定する接続先のタイプに応じて Model の SQL が sqlglot によって変換され、実行される、というコンセプトになっているようです。
tobymao/sqlglot: Python SQL Parser and Transpiler
複数gatewayの罠
複数のgatewayを利用するには、いくつか問題がありました。
大文字で作成される Snowflake 上のスキーマ名が見つからない問題
dlt で作成されるソーステーブルは、Snowflake上では大文字で作成されるのですが、sqlglot で変換されるクエリでは小文字のスキーマ名を探しに行くため、クエリが失敗してしまいます。
この問題は、「gateway ごとの model_defaults.dialect で normalization_strategy を指定できる機能」によって回避できます。
具体的には、下記のようなオプションを config.yaml
に指定します。
gateways:
duckdb:
connection:
type: duckdb
database: /path/to/dialect_check_data.duckdb
snowflake:
connection:
type: snowflake
account: xxxx
user: xxxx
private_key: |
-----BEGIN PRIVATE KEY-----
some_private_key
==
-----END PRIVATE KEY-----
database: xxxx
warehouse: xxxx
role: xxxxxx
model_defaults:
#see https://github.com/tobymao/sqlglot/blob/main/sqlglot/dialects/dialect.py#L92
# uppercase means "Unquoted identifiers are uppercased."
dialect: duckdb, normalization_strategy=uppercase <- ココ
default_gateway: duckdb
model_defaults:
dialect: duckdb
start: 2025-02-24
SHA2 / SHA256 など、SQLエンジンによって名前が異なる関数がある問題
Python 3.13.1 (main, Dec 3 2024, 17:59:52) [Clang 16.0.0 (clang-1600.0.26.4)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlglot
>>> sqlglot.__version__
'26.7.0'
>>> sqlglot.parse_one("SELECT sha256(col1)", dialect="duckdb").sql(dialect="snowflake")
'SELECT SHA256(col1)' #<- 本来は SHA2(col1)になるべき
これは、SQLGlotのduckdb dialect の Parser に問題があったことが原因でした。
>>> sqlglot.parse_one("SELECT sha256(col1)", dialect="duckdb")
Select(
expressions=[
Anonymous( #<- ここがSHA2として認識されないと、snowflake dialect として正しく変換されない
this=sha256,
expressions=[
Column(
this=Identifier(this=col1, quoted=False))])])
以下のチケットで現在は修正されています。
Parser
チケット: The sha256 function is not recognized correctly in duckdb dialect. · Issue #4815 · tobymao/sqlglot
TO_TIMESTAMP() の引数が、duckdbだとdoubleを受け付けるが、snowflakeだとintしか受け付けない問題
Model の中で使われている TO_TIMESTAMP()
関数の引数が、duckdb だと double だが、snowflake だと int という差異がありました。
そのため、modelに下記のような記述があると、snowflake側で実行エラーになってしまいました。
TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) as _dlt_load_time
snowflake dialect の Generator では、TO_TIMESTAMP は 以下のように書かれており、単にrenameだけしているようでした。
exp.UnixToTime: rename_func("TO_TIMESTAMP"),
この問題は、duckdbのTO_TIMESTAMP() 関数が int も受け付けることがわかったため、以下のようにmodelのクエリを変更することで解消できました
TO_TIMESTAMP(CAST(c._dlt_load_id AS INT)) as _dlt_load_time
(おそらく、sqlglot の snowflake dialect のGeneratorを変更しても対応可能だったと思います)
参考:
現時点での所感
「sqlglot を使い、複数のSQLエンジンを一つのModelファイルでサポートする」というコンセプトは、(dialectの対応でハマる恐れはあるものの)ローカルのduckdbをモデルの開発に使うことができるので非常に魅力的だと感じました。
また、sqlmesh の仮想環境 は、一見複雑そうに感じましたが、「仮想環境の切り替えはView の定義の切り替えに相当する」と考えれば、コントロール可能である、という印象をもちました。