0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

dlt でロードしたデータを SQLMesh で扱ってみる

Last updated at Posted at 2025-02-28

表題の作業を実際に行ってみました。問題点や、その一部の解消方法をメモしました。

はじめに

SQLMesh とは

SQLMeshは「dbt に代わるデータ変換ツール」として注目されています。
Tobiko Data社を中心にOSSとして開発されているほか、Tobiko Cloudとしてマネージドサービスも提供しています。

SQLMeshの主な特徴は以下の通りです。
参考: Tobiko - dbt tag

virtual data environments

開発環境、検証環境などを view の切り替えを使って、実体データの複製を最小限に止める仕組みです。
Copy-On-Writeに似た、「変更が生じたテーブルのみ新しく作成し、その他は既存のテーブルを参照させる方法」をとっているようです)
開発環境で動作確認したものを本番環境に適用する場合や、切り戻しをする場合も、view の参照先を切り替えるだけなので、短時間で行えます。

「本物のデータを使わないと検証にならない」というのはよくある状況だと思いますが、モデルの開発者には個人情報へのアクセスを制限する必要があるなど、セキュリティやガバナンス観点で環境分離を行なっている場合も多いと思います。
virtual data environment による環境分離だけでモデル開発を進めるかどうか、組織のポリシー次第になるのではないでしょうか。

SQLMesh でも gateway という接続先を分離することで複数の環境を分けることも可能で、その辺りの手順がこちらのページに解説されています。

Isolated systems guide - SQLMesh

column-level lineage

SQLGlot というライブラリを内部で利用しており、クエリの内容をAST(抽象構文木)レベルで解釈できるので、カラムレベルでの系統情報を扱うことができます。
また、モデルの変更を行う際に、その変更が下流のモデルに影響するのかどうかをシステムが推定し、必要なものだけを再作成することができます。下流のモデルをすべて再作成するのに比べて、計算のコストを低減することができます。

state awareness

dbt の incremental なマテリアライズ方式では、「既存のデータに含まれる最新の日時以降に、上流で増加したレコード」を取り込むことができます。SQLMeshではモデルごとに処理対象の時間間隔や、実行の頻度を指定し(interval approach)、「Modelでどの時間区分を処理したか」というstateを追跡管理します。
差分更新のモデルであっても、過去のデータを部分的に再処理したり、最初に大きなデータを取り込む場合にも分割して処理することができます。

dlt とは

検索すると、Databricks の Delta Live Tables の結果が混じってくるので厄介ですが、"data load tool" の略称で、データロードを行うオープンソースのPythonライブラリです。

Introduction | dlt Docs

「Pythonのライブラリ」なので、Notebook上でも動かせます。また、スクリプトを書けば、さまざまなデータソースを簡単に取り込むことができます。

データの構造や型から、ロードするテーブルのスキーマを自動的に作成してくれます。

dlt でのデータロード

下記のようなサンプルのデータロードスクリプトを用意しました。
duckdb と snowflake の両方に、同じデータをロードします。

import dlt


DUCKDB_PATH = "./data/dialect_check_data.duckdb"

pipeline_snow = dlt.pipeline(
    pipeline_name="dialect_check_snowflake",
    destination="snowflake",
    dataset_name="dia_check_raw",
    dev_mode=False,
)

duckdb_destination = dlt.destinations.duckdb(
        DUCKDB_PATH,
        enable_dataset_name_normalization=False)
#breakpoint()
pipeline_duck = dlt.pipeline(
    pipeline_name="dialect_check_duckdb",
    destination=duckdb_destination,
    dataset_name="dia_check_raw",
    dev_mode=False,
)

data = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]

if __name__ == "__main__":
    for pipeline in [pipeline_snow, pipeline_duck]:
    #for pipeline in [pipeline_snow, ]:
        load_info = pipeline.run(
            data,
            table_name="items",
            write_disposition={
                "disposition": "append",
            },
            refresh="drop_resources"
        )
        print(load_info)

dlt の接続情報は、project_root/.dlt/secrets.toml に保存します。

この結果、duckdb 側では dataset name である dia_check_raw が小文字のままカタログ名として利用されます。
duckdb_catalog_name.jpg

一方、snowflake 側では 大文字の DIA_CHECK_RAW がスキーマ名として利用されます。
snowflake_schema_name.jpg

SQLMesh では、sqlglotを利用して、1つの方言(dialect)で書かれたModelファイルで複数のDWHを管理することができる 1 設計になっているのですが、Modelで読みに行く先のカタログ名/スキーマ名のCaseが異なる、という事象に対応する必要が生じます。

SQLMesh によるモデルの構築

パイプライン名を確認します

% pdm run dlt pipeline --list-pipelines
11 pipelines found in /Users/m-nakamura/.dlt/pipelines
dialect_check_snowflake
dialect_check_duckdb

...

パイプライン名を指定して、SQLMesh の modelを自動生成することができます。
参考: dlt - SQLMesh

以下の例は、raw_csv_import_dev という dlt pipeline をもとに、duckdb dialect の SQLMesh モデルを生成するコマンドです。

% pdm run sqlmesh init -t dlt --dlt-pipeline raw_csv_import_dev duckdb

以下のようなファイルが生成されます

.
├── audits
├── config.yaml
├── macros
├── models
│   ├── incremental__dlt_loads.sql
│   └── incremental_items.sql
├── seeds
└── tests

(incremental__dlt_loads.sql は dlt で利用するメタデータなので不要かもしれません)

incremental_items.sql は以下のような内容です。
kind が差分更新になっています。

参考: Model kinds - SQLMesh

MODEL (
  name dia_check_raw_sqlmesh.incremental_items,
  kind INCREMENTAL_BY_TIME_RANGE (
    time_column _dlt_load_time,
  ),
);

SELECT
  CAST(c.id AS BIGINT) AS id,
  CAST(c.name AS TEXT) AS name,
  CAST(c._dlt_load_id AS TEXT) AS _dlt_load_id,
  CAST(c._dlt_id AS TEXT) AS _dlt_id,
  TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) as _dlt_load_time
FROM
  dia_check_raw.items as c
WHERE
  TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) BETWEEN @start_ds AND @end_ds

SQLMesh で利用する接続の情報は、project_root/config.yaml に保存します。

SQLMesh では、複数の独立したシステム(本番環境 / 検証環境 のようなもの)に対応することができます。「仮想環境」という用語と混乱を招く恐れがあるため、SQLMeshのドキュメントの中では 'isolated system' と呼ばれています。
Isolated systems guide - SQLMesh
具体的には、config.yaml のなかに複数の gateway を定義し、切り替えて利用することができます。

作成する Model には SQL の dialect を設定することができ、gateway で指定する接続先のタイプに応じて Model の SQL が sqlglot によって変換され、実行される、というコンセプトになっているようです。
tobymao/sqlglot: Python SQL Parser and Transpiler

複数gatewayの罠

複数のgatewayを利用するには、いくつか問題がありました。

大文字で作成される Snowflake 上のスキーマ名が見つからない問題

dlt で作成されるソーステーブルは、Snowflake上では大文字で作成されるのですが、sqlglot で変換されるクエリでは小文字のスキーマ名を探しに行くため、クエリが失敗してしまいます。

この問題は、「gateway ごとの model_defaults.dialect で normalization_strategy を指定できる機能」によって回避できます。

具体的には、下記のようなオプションを config.yaml に指定します。

gateways:
  duckdb:
    connection:
      type: duckdb
      database: /path/to/dialect_check_data.duckdb
  snowflake:
    connection:
      type: snowflake
      account: xxxx
      user: xxxx
      private_key: |
        -----BEGIN PRIVATE KEY-----
        some_private_key
        ==
        -----END PRIVATE KEY-----
      database: xxxx
      warehouse: xxxx
      role: xxxxxx
    model_defaults:
      #see  https://github.com/tobymao/sqlglot/blob/main/sqlglot/dialects/dialect.py#L92
      # uppercase means "Unquoted identifiers are uppercased."
      dialect: duckdb, normalization_strategy=uppercase  <- ココ

default_gateway: duckdb

model_defaults:
  dialect: duckdb
  start: 2025-02-24

Feat: Add model defaults support per gateway configuration by VaggelisD · Pull Request #3888 · TobikoData/sqlmesh

SHA2 / SHA256 など、SQLエンジンによって名前が異なる関数がある問題

Python 3.13.1 (main, Dec  3 2024, 17:59:52) [Clang 16.0.0 (clang-1600.0.26.4)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlglot
>>> sqlglot.__version__
'26.7.0'
>>> sqlglot.parse_one("SELECT sha256(col1)", dialect="duckdb").sql(dialect="snowflake")
'SELECT SHA256(col1)' #<- 本来は SHA2(col1)になるべき

これは、SQLGlotのduckdb dialect の Parser に問題があったことが原因でした。

>>> sqlglot.parse_one("SELECT sha256(col1)", dialect="duckdb")
Select(
  expressions=[
    Anonymous(   #<-  ここがSHA2として認識されないと、snowflake dialect として正しく変換されない
      this=sha256,
      expressions=[
        Column(
          this=Identifier(this=col1, quoted=False))])])

以下のチケットで現在は修正されています。
Parser
チケット: The sha256 function is not recognized correctly in duckdb dialect. · Issue #4815 · tobymao/sqlglot

TO_TIMESTAMP() の引数が、duckdbだとdoubleを受け付けるが、snowflakeだとintしか受け付けない問題

Model の中で使われている TO_TIMESTAMP()関数の引数が、duckdb だと double だが、snowflake だと int という差異がありました。

そのため、modelに下記のような記述があると、snowflake側で実行エラーになってしまいました。

TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) as _dlt_load_time

snowflake dialect の Generator では、TO_TIMESTAMP は 以下のように書かれており、単にrenameだけしているようでした。

exp.UnixToTime: rename_func("TO_TIMESTAMP"),

この問題は、duckdbのTO_TIMESTAMP() 関数が int も受け付けることがわかったため、以下のようにmodelのクエリを変更することで解消できました

TO_TIMESTAMP(CAST(c._dlt_load_id AS INT)) as _dlt_load_time

(おそらく、sqlglot の snowflake dialect のGeneratorを変更しても対応可能だったと思います)
参考:

現時点での所感

「sqlglot を使い、複数のSQLエンジンを一つのModelファイルでサポートする」というコンセプトは、(dialectの対応でハマる恐れはあるものの)ローカルのduckdbをモデルの開発に使うことができるので非常に魅力的だと感じました。
また、sqlmesh の仮想環境 は、一見複雑そうに感じましたが、「仮想環境の切り替えはView の定義の切り替えに相当する」と考えれば、コントロール可能である、という印象をもちました。

  1. Tobiko - Automated Data Warehouse Migrations

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?