2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Snowflake のテーブルデータを Databricks に統合する方法の実装案

Last updated at Posted at 2025-03-18

概要

Snowflake 上のテーブルデータを Databricks に同期する(データ統合する)方法を検討した結果を共有します。
近年、クラウド DWH の普及により導入コストが低下したことから複数の DWH を運用するケースが増え、データ統合の必要性がますます高まっています。
そのため、DWH 間でのデータ統合はデータ利活用における重要なテーマのひとつになりつつあります。

本記事では、Snowflake のテーブルデータを Databricks に同期する方法論を整理し、実装にあたっての考慮点をまとめました。

検討時の前提事項

データ統合実施者の前提事項

Databricks 側のプログラム開発を想定し、以下を前提として選択肢を検討します。

  1. リソースのコストと開発・運用コストを最小限に抑える
  2. Snowflake 側での設計・開発に関わっていない
  3. Snowflake のテーブルで実施された DELETE を反映できる

Snowflake における前提事項

Snowflake における制限を以下のとおり確認しています。

  1. Snowflake の Apache Iceberg™ テーブルを CLONING するとき、標準テーブルをソースにできない
  2. STREAM をソースとして INSERT する際、スキーマ進化(スキーマ展開)ができない

Snowflake の Apache Iceberg™ テーブルを CLONING で作成するときに標準テーブルをソースとできない仕様について、以下のようにドキュメントに記述されています。
「現在は」という文言があるため、将来的に Snowflake 標準テーブルからの CLONING がサポートされる可能性があり、その際には本記事の内容を再度検討する必要があります。

Apache Iceberg™ テーブル の場合、クローンは現在Snowflake管理テーブルでのみサポートされています。詳細については、 クローニングと Apache Iceberg™ テーブル をご参照ください。

image.png

出所:CREATE <オブジェクト> ... CLONE | Snowflake Documentation

STREAM のスキーマがソーステーブルのスキーマ変更に追随する仕様であるため、STREAM から INSERT するときはスキーマ進化(スキーマ展開)が必須です。しかし、Snowflake ドキュメントでは以下のように記載されており、データ連携先でソース側のスキーマ変更を取り込むための自動的な仕組みはありません。

INSERT の操作で自動的にターゲットテーブルのスキーマを進化させることはできません。

image.png

出所:テーブルスキーマの進化 | Snowflake Documentation

Databricks における前提事項

Databricks では、以下の制限を確認しています。

  • Unity Catalog が有効なクラスターで Apache Iceberg テーブルを参照する方法を現時点で確立できていない

Unity Catalog が有効なクラスターで Apache Iceberg テーブルを参照する方法については、以下の記事にまとめています。アクセスモードを「分離なし共有」に設定したクラスターでのみ参照できることを確認しています。
ただし、Databricks では Unity Catalog のマネージドテーブルを利用する方が開発・運用上は手軽なため、今回の検討では Apache Iceberg テーブルは参照しない方針としました。

実装案

現実的な実装案

現時点で現実的な実装案としては、以下の 1 つに絞られます。
Snowflake のテーブルをソースとした STREAM を作成し、CTAS で都度実体化したテーブルを Databricks に連携するのが最適と考えられます。

  1. Snowflake の STREAM を Databricks に連携する方法

限定的な状況下で実施可能な案

Snowflake 側でまったくスキーマ変更が発生しないか、あるいは変更があった場合に必ず Snowflake 側で反映させる運用が可能な場合は、以下も選択肢になります。

  1. Snowflake の STREAM の内容を Snowflake 管理の Iceberg テーブルに反映し、Databricks に連携する方法

この場合、Databricks 側で DEEP CLONE を利用することでデータ同期が容易になります。Apache Iceberg テーブルを CLONE する方法については、以下の記事で紹介しています。

Snowflake の STREAM を Databricks に連携する方法について

実装ロジック

Snowflake 上に STREAM が作成済みであることを前提とし、以下のロジックを想定しています。

  1. STREAM を実体化するテーブルにデータがないことを確認
    • データがあった場合は、そのテーブルから Databricks に連携
  2. Snowflake 上で STREAM を CTAS によりテーブルに実体化
  3. STREAM を実体化したテーブルを Spark Dataframe として読み込み、データ型の変更を確認
    • データ型の変更がある場合は、Databricks のテーブルに対してスキーマ変更を反映
  4. Spark Dataframe を Databricks のテーブルに APPEND
  5. Append したテーブルをもとに、Databricks のテーブルに MERGE を実行
  6. Snowflake 上で STREAM を実体化したテーブルを TRUNCATE TABLE

STREAM のデータを確実に特定し、Databricks 側に取り込むため、CTAS (CREATE TABLE AS SELECT) によるテーブル実体化を行います。

連携途中でエラーが発生した場合に備え、まず STREAM を実体化したテーブルの状態を確認し、そこにデータが残っている場合はそのテーブルから Databricks に再連携します。

さらに、Databricks のテーブルに対して APPEND と MERGE を行いますが、要件によっては MERGE のみでも問題ありません。

生成 AI により作成したコードを下記に示します。動作確認等は実施していないため、実装イメージとして捉えてください。共通のクラスを作成して、 STREAM ごとに並列で実行するイメージです。 Databricks Workflows 等によりノートブックを並列で実行させる方法がおすすめです。

コードのイメージ
import snowflake.connector
from pyspark.sql import SparkSession, DataFrame
from typing import Dict, Any

class SnowflakeToDatabricksIntegration:
    """
    Snowflake 上の STREAM を Databricks に同期するためのクラス。

    以下の処理を順番に実施する想定:
      1. STREAM を実体化するテーブル (snowflake_stream_table) にデータがないか確認
         - データがあった場合は、そのテーブルから先に連携処理を実施
      2. Snowflake 上で STREAM を CTAS によりテーブルに実体化
      3. STREAM を実体化したテーブルを Spark DataFrame として読み込み、スキーマを確認
         - 必要に応じて Databricks 側のテーブルへスキーマ変更を反映
      4. Spark DataFrame を Databricks のテーブルに APPEND
      5. APPEND したテーブルを元に、Databricks の Target テーブルに MERGE を実行
      6. Snowflake 上で STREAM 実体化テーブルを TRUNCATE TABLE
    """

    def __init__(
        self,
        spark: SparkSession,
        snowflake_options: Dict[str, Any],
        snowflake_account: str,
        snowflake_user: str,
        snowflake_password: str,
        snowflake_database: str,
        snowflake_schema: str,
        snowflake_warehouse: str,
        snowflake_stream_name: str,
        snowflake_stream_table: str,
        databricks_target_table: str,
        databricks_staging_table: str
    ):
        """
        コンストラクタ

        Parameters
        ----------
        spark : SparkSession
            Databricks 上で実行している SparkSession
        snowflake_options : Dict[str, Any]
            Spark-Snowflake Connector 用のオプション (sfUrl, sfUser, sfPassword, sfDatabase, sfSchema, sfWarehouse 等)
        snowflake_account : str
            Snowflake のアカウント名 (例: xxx.yyy.azure)
        snowflake_user : str
            Snowflake ユーザ名
        snowflake_password : str
            Snowflake パスワード
        snowflake_database : str
            対象の Snowflake データベース名
        snowflake_schema : str
            対象の Snowflake スキーマ名
        snowflake_warehouse : str
            対象の Snowflake ウェアハウス名
        snowflake_stream_name : str
            STREAM オブジェクト名 (ソースとなる STREAM)
        snowflake_stream_table : str
            STREAM を CTAS で実体化するテーブル名
        databricks_target_table : str
            Databricks 側で最終的に統合するテーブル名
        databricks_staging_table : str
            Databricks 側で一時的に APPEND するテーブル名(MERGE 前のステージング用)
        """
        self.spark = spark
        self.snowflake_options = snowflake_options
        self.snowflake_account = snowflake_account
        self.snowflake_user = snowflake_user
        self.snowflake_password = snowflake_password
        self.snowflake_database = snowflake_database
        self.snowflake_schema = snowflake_schema
        self.snowflake_warehouse = snowflake_warehouse
        self.snowflake_stream_name = snowflake_stream_name
        self.snowflake_stream_table = snowflake_stream_table
        self.databricks_target_table = databricks_target_table
        self.databricks_staging_table = databricks_staging_table

    def run_integration(self):
        """
        Snowflake → Databricks の連携フローを実行する。
        """
        # 1. STREAM 実体化テーブルの存在チェック (既存データがあれば先に連携)
        if self._check_snowflake_table_has_data(self.snowflake_stream_table):
            print(f"既存の {self.snowflake_stream_table} にデータが存在するため、先に連携します。")
            self._process_snowflake_stream_table()

        # 2. Snowflake の STREAM を CTAS によりテーブルに実体化
        print(f"{self.snowflake_stream_name} を CTAS で実体化します...")
        self._ctas_stream_table()

        # 3. 実体化したテーブルを Spark DataFrame として読み込み、スキーマを確認
        print(f"{self.snowflake_stream_table} のデータを Spark DataFrame として読み込みます。")
        stream_df = self._read_snowflake_stream_table()

        print("Databricks 側テーブルのスキーマと比較し、必要に応じてスキーマ進化を反映します。")
        self._check_and_evolve_schema(stream_df, self.databricks_staging_table)

        # 4. Spark DataFrame を Databricks の Staging テーブルに APPEND
        print(f"DataFrame を Databricks テーブル `{self.databricks_staging_table}` に APPEND します。")
        self._append_to_databricks_table(stream_df, self.databricks_staging_table)

        # 5. Staging テーブルをもとに Target テーブルに MERGE を実行
        print(f"MERGE を実行し、`{self.databricks_target_table}` を更新します。")
        self._merge_into_target_table()

        # 6. Snowflake 上の STREAM 実体化テーブルを TRUNCATE
        print(f"Snowflake のテーブル `{self.snowflake_stream_table}` を TRUNCATE します。")
        self._truncate_snowflake_table(self.snowflake_stream_table)

        print("全ての処理が完了しました。")

    def _check_snowflake_table_has_data(self, table_name: str) -> bool:
        """
        指定した Snowflake テーブルにデータが存在するかチェックする。

        Returns
        -------
        bool
            True: データあり, False: 空
        """
        df = self.spark.read \
            .format("snowflake") \
            .options(**self.snowflake_options) \
            .option("dbtable", f"{self.snowflake_database}.{self.snowflake_schema}.{table_name}") \
            .load()
        return bool(df.head(1))

    def _process_snowflake_stream_table(self):
        """
        既存の Snowflake STREAM 実体化テーブルにデータが残っている場合の処理。
        連携後、Snowflake テーブルを TRUNCATE します。
        """
        df = self._read_snowflake_stream_table()
        self._check_and_evolve_schema(df, self.databricks_staging_table)
        self._append_to_databricks_table(df, self.databricks_staging_table)
        self._merge_into_target_table()
        self._truncate_snowflake_table(self.snowflake_stream_table)

    def _ctas_stream_table(self):
        """
        Snowflake 上で CTAS を実行し、STREAM を実体化する。
        """
        query = f"""
        CREATE OR REPLACE TABLE {self.snowflake_database}.{self.snowflake_schema}.{self.snowflake_stream_table} AS
        SELECT *
        FROM {self.snowflake_database}.{self.snowflake_schema}.{self.snowflake_stream_name}
        """
        self._run_snowflake_query(query)

    def _read_snowflake_stream_table(self) -> DataFrame:
        """
        Snowflake の STREAM 実体化テーブルを Spark DataFrame として読み込む。

        Returns
        -------
        DataFrame
            読み込んだデータ
        """
        df = self.spark.read \
            .format("snowflake") \
            .options(**self.snowflake_options) \
            .option("dbtable", f"{self.snowflake_database}.{self.snowflake_schema}.{self.snowflake_stream_table}") \
            .load()
        return df

    def _check_and_evolve_schema(self, source_df: DataFrame, dbx_table_name: str):
        """
        Databricks 側テーブルと比較し、スキーマ差分がある場合は必要な変更を反映する。
        Delta テーブルの場合は mergeSchema オプションにより自動拡張できるため、現状はスタブ実装です。

        Parameters
        ----------
        source_df : DataFrame
            Snowflake から読み込んだデータ
        dbx_table_name : str
            対象の Databricks テーブル名
        """
        if not self.spark.catalog.tableExists(dbx_table_name):
            print(f"Databricks テーブル `{dbx_table_name}` は存在しないため、スキーマ進化は不要です。")
            return

        print(f"Databricks テーブル `{dbx_table_name}` のスキーマ進化チェックを実施します。")
        # ※必要に応じ、source_df.schema と対象テーブルのスキーマを比較し、
        # ALTER TABLE ADD COLUMNS 等の対応を実施してください。
        pass

    def _append_to_databricks_table(self, df: DataFrame, dbx_table_name: str):
        """
        DataFrame を Databricks のテーブルに APPEND する。
        Delta テーブルの場合、mergeSchema オプションを有効にして書き込む。

        Parameters
        ----------
        df : DataFrame
            書き込み対象の DataFrame
        dbx_table_name : str
            対象の Databricks テーブル名
        """
        df.write \
          .format("delta") \
          .option("mergeSchema", "true") \
          .mode("append") \
          .saveAsTable(dbx_table_name)

    def _merge_into_target_table(self):
        """
        Databricks の Staging テーブルから Target テーブルに MERGE を実行する。
        ※実際の要件に合わせ、ON 条件やカラム名を適宜修正してください。
        """
        merge_sql = f"""
            MERGE INTO {self.databricks_target_table} AS T
            USING {self.databricks_staging_table} AS S
            ON T.id = S.id
            WHEN MATCHED AND S.metadata$action = 'DELETE' THEN DELETE
            WHEN MATCHED THEN
                UPDATE SET
                    T.col1 = S.col1,
                    T.col2 = S.col2
            WHEN NOT MATCHED THEN
                INSERT (id, col1, col2)
                VALUES (S.id, S.col1, S.col2)
        """
        print("実行する MERGE SQL:")
        print(merge_sql)
        self.spark.sql(merge_sql)

    def _truncate_snowflake_table(self, table_name: str):
        """
        Snowflake 上のテーブルを TRUNCATE する。
        """
        query = f"TRUNCATE TABLE {self.snowflake_database}.{self.snowflake_schema}.{table_name}"
        self._run_snowflake_query(query)

    def _run_snowflake_query(self, query: str):
        """
        Python の snowflake.connector を利用し、Snowflake 上でクエリを実行する。
        """
        conn = snowflake.connector.connect(
            user=self.snowflake_user,
            password=self.snowflake_password,
            account=self.snowflake_account,
            warehouse=self.snowflake_warehouse,
            database=self.snowflake_database,
            schema=self.snowflake_schema
        )
        try:
            with conn.cursor() as cur:
                cur.execute(query)
                # 必要に応じて明示的な commit を実施
                conn.commit()
        finally:
            conn.close()
# =============================================================================
# 利用例 (Databricks Notebook 等で実行する場合)
# =============================================================================
# SparkSession (spark) は既に生成済みと仮定

# Snowflake 接続情報の例
snowflake_options = {
    "sfUrl": "xxxxxx.snowflakecomputing.com",
    "sfUser": "YOUR_USER",
    "sfPassword": "YOUR_PASSWORD",
    "sfDatabase": "YOUR_DATABASE",
    "sfSchema": "YOUR_SCHEMA",
    "sfWarehouse": "YOUR_WAREHOUSE",
}

integration = SnowflakeToDatabricksIntegration(
    spark=spark,
    snowflake_options=snowflake_options,
    snowflake_account="xxxxxx",  # 例: abcde12345.east-us-2.azure
    snowflake_user="YOUR_USER",
    snowflake_password="YOUR_PASSWORD",
    snowflake_database="YOUR_DATABASE",
    snowflake_schema="YOUR_SCHEMA",
    snowflake_warehouse="YOUR_WAREHOUSE",
    snowflake_stream_name="YOUR_STREAM_NAME",        # 既存の STREAM 名
    snowflake_stream_table="YOUR_STREAM_TABLE",       # CTAS で作成するテーブル名
    databricks_target_table="your_db.target_table",   # MERGE 先の本番テーブル
    databricks_staging_table="your_db.staging_table"  # 一時的に APPEND するステージングテーブル
)

# メイン処理実行
integration.run_integration()

関連情報

Snowflake にて利用する機能について

Snowflake では以下の機能を主に利用します。

  • STREAM

STREAM については以下の検証記事を投稿しています。STREAM は厳密な CDC を実施できるわけではないものの、前回の連携以降に DELETE されたレコードを特定できるため有用です。 DML 操作ごとに保持されるデータパターンが異なるため、ダウンストリームで MERGE を行う際にはそれを考慮する必要があります。

Databricks にて利用する機能について

Databricks では以下の機能を主に利用します。

  • 書き込み時のスキーマ展開(Schema Evolution)
  • 型拡張(Type widening)
2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?