概要
Snowflake 上のテーブルデータを Databricks に同期する(データ統合する)方法を検討した結果を共有します。
近年、クラウド DWH の普及により導入コストが低下したことから複数の DWH を運用するケースが増え、データ統合の必要性がますます高まっています。
そのため、DWH 間でのデータ統合はデータ利活用における重要なテーマのひとつになりつつあります。
本記事では、Snowflake のテーブルデータを Databricks に同期する方法論を整理し、実装にあたっての考慮点をまとめました。
検討時の前提事項
データ統合実施者の前提事項
Databricks 側のプログラム開発を想定し、以下を前提として選択肢を検討します。
- リソースのコストと開発・運用コストを最小限に抑える
- Snowflake 側での設計・開発に関わっていない
- Snowflake のテーブルで実施された DELETE を反映できる
Snowflake における前提事項
Snowflake における制限を以下のとおり確認しています。
- Snowflake の Apache Iceberg™ テーブルを CLONING するとき、標準テーブルをソースにできない
- STREAM をソースとして INSERT する際、スキーマ進化(スキーマ展開)ができない
Snowflake の Apache Iceberg™ テーブルを CLONING で作成するときに標準テーブルをソースとできない仕様について、以下のようにドキュメントに記述されています。
「現在は」という文言があるため、将来的に Snowflake 標準テーブルからの CLONING がサポートされる可能性があり、その際には本記事の内容を再度検討する必要があります。
Apache Iceberg™ テーブル の場合、クローンは現在Snowflake管理テーブルでのみサポートされています。詳細については、 クローニングと Apache Iceberg™ テーブル をご参照ください。
出所:CREATE <オブジェクト> ... CLONE | Snowflake Documentation
STREAM のスキーマがソーステーブルのスキーマ変更に追随する仕様であるため、STREAM から INSERT するときはスキーマ進化(スキーマ展開)が必須です。しかし、Snowflake ドキュメントでは以下のように記載されており、データ連携先でソース側のスキーマ変更を取り込むための自動的な仕組みはありません。
INSERT の操作で自動的にターゲットテーブルのスキーマを進化させることはできません。
出所:テーブルスキーマの進化 | Snowflake Documentation
Databricks における前提事項
Databricks では、以下の制限を確認しています。
- Unity Catalog が有効なクラスターで Apache Iceberg テーブルを参照する方法を現時点で確立できていない
Unity Catalog が有効なクラスターで Apache Iceberg テーブルを参照する方法については、以下の記事にまとめています。アクセスモードを「分離なし共有」に設定したクラスターでのみ参照できることを確認しています。
ただし、Databricks では Unity Catalog のマネージドテーブルを利用する方が開発・運用上は手軽なため、今回の検討では Apache Iceberg テーブルは参照しない方針としました。
実装案
現実的な実装案
現時点で現実的な実装案としては、以下の 1 つに絞られます。
Snowflake のテーブルをソースとした STREAM を作成し、CTAS で都度実体化したテーブルを Databricks に連携するのが最適と考えられます。
- Snowflake の STREAM を Databricks に連携する方法
限定的な状況下で実施可能な案
Snowflake 側でまったくスキーマ変更が発生しないか、あるいは変更があった場合に必ず Snowflake 側で反映させる運用が可能な場合は、以下も選択肢になります。
- Snowflake の STREAM の内容を Snowflake 管理の Iceberg テーブルに反映し、Databricks に連携する方法
この場合、Databricks 側で DEEP CLONE を利用することでデータ同期が容易になります。Apache Iceberg テーブルを CLONE する方法については、以下の記事で紹介しています。
Snowflake の STREAM を Databricks に連携する方法について
実装ロジック
Snowflake 上に STREAM が作成済みであることを前提とし、以下のロジックを想定しています。
- STREAM を実体化するテーブルにデータがないことを確認
- データがあった場合は、そのテーブルから Databricks に連携
- Snowflake 上で STREAM を CTAS によりテーブルに実体化
- STREAM を実体化したテーブルを Spark Dataframe として読み込み、データ型の変更を確認
- データ型の変更がある場合は、Databricks のテーブルに対してスキーマ変更を反映
- Spark Dataframe を Databricks のテーブルに APPEND
- Append したテーブルをもとに、Databricks のテーブルに MERGE を実行
- Snowflake 上で STREAM を実体化したテーブルを TRUNCATE TABLE
STREAM のデータを確実に特定し、Databricks 側に取り込むため、CTAS (CREATE TABLE AS SELECT) によるテーブル実体化を行います。
連携途中でエラーが発生した場合に備え、まず STREAM を実体化したテーブルの状態を確認し、そこにデータが残っている場合はそのテーブルから Databricks に再連携します。
さらに、Databricks のテーブルに対して APPEND と MERGE を行いますが、要件によっては MERGE のみでも問題ありません。
生成 AI により作成したコードを下記に示します。動作確認等は実施していないため、実装イメージとして捉えてください。共通のクラスを作成して、 STREAM ごとに並列で実行するイメージです。 Databricks Workflows 等によりノートブックを並列で実行させる方法がおすすめです。
コードのイメージ
import snowflake.connector
from pyspark.sql import SparkSession, DataFrame
from typing import Dict, Any
class SnowflakeToDatabricksIntegration:
"""
Snowflake 上の STREAM を Databricks に同期するためのクラス。
以下の処理を順番に実施する想定:
1. STREAM を実体化するテーブル (snowflake_stream_table) にデータがないか確認
- データがあった場合は、そのテーブルから先に連携処理を実施
2. Snowflake 上で STREAM を CTAS によりテーブルに実体化
3. STREAM を実体化したテーブルを Spark DataFrame として読み込み、スキーマを確認
- 必要に応じて Databricks 側のテーブルへスキーマ変更を反映
4. Spark DataFrame を Databricks のテーブルに APPEND
5. APPEND したテーブルを元に、Databricks の Target テーブルに MERGE を実行
6. Snowflake 上で STREAM 実体化テーブルを TRUNCATE TABLE
"""
def __init__(
self,
spark: SparkSession,
snowflake_options: Dict[str, Any],
snowflake_account: str,
snowflake_user: str,
snowflake_password: str,
snowflake_database: str,
snowflake_schema: str,
snowflake_warehouse: str,
snowflake_stream_name: str,
snowflake_stream_table: str,
databricks_target_table: str,
databricks_staging_table: str
):
"""
コンストラクタ
Parameters
----------
spark : SparkSession
Databricks 上で実行している SparkSession
snowflake_options : Dict[str, Any]
Spark-Snowflake Connector 用のオプション (sfUrl, sfUser, sfPassword, sfDatabase, sfSchema, sfWarehouse 等)
snowflake_account : str
Snowflake のアカウント名 (例: xxx.yyy.azure)
snowflake_user : str
Snowflake ユーザ名
snowflake_password : str
Snowflake パスワード
snowflake_database : str
対象の Snowflake データベース名
snowflake_schema : str
対象の Snowflake スキーマ名
snowflake_warehouse : str
対象の Snowflake ウェアハウス名
snowflake_stream_name : str
STREAM オブジェクト名 (ソースとなる STREAM)
snowflake_stream_table : str
STREAM を CTAS で実体化するテーブル名
databricks_target_table : str
Databricks 側で最終的に統合するテーブル名
databricks_staging_table : str
Databricks 側で一時的に APPEND するテーブル名(MERGE 前のステージング用)
"""
self.spark = spark
self.snowflake_options = snowflake_options
self.snowflake_account = snowflake_account
self.snowflake_user = snowflake_user
self.snowflake_password = snowflake_password
self.snowflake_database = snowflake_database
self.snowflake_schema = snowflake_schema
self.snowflake_warehouse = snowflake_warehouse
self.snowflake_stream_name = snowflake_stream_name
self.snowflake_stream_table = snowflake_stream_table
self.databricks_target_table = databricks_target_table
self.databricks_staging_table = databricks_staging_table
def run_integration(self):
"""
Snowflake → Databricks の連携フローを実行する。
"""
# 1. STREAM 実体化テーブルの存在チェック (既存データがあれば先に連携)
if self._check_snowflake_table_has_data(self.snowflake_stream_table):
print(f"既存の {self.snowflake_stream_table} にデータが存在するため、先に連携します。")
self._process_snowflake_stream_table()
# 2. Snowflake の STREAM を CTAS によりテーブルに実体化
print(f"{self.snowflake_stream_name} を CTAS で実体化します...")
self._ctas_stream_table()
# 3. 実体化したテーブルを Spark DataFrame として読み込み、スキーマを確認
print(f"{self.snowflake_stream_table} のデータを Spark DataFrame として読み込みます。")
stream_df = self._read_snowflake_stream_table()
print("Databricks 側テーブルのスキーマと比較し、必要に応じてスキーマ進化を反映します。")
self._check_and_evolve_schema(stream_df, self.databricks_staging_table)
# 4. Spark DataFrame を Databricks の Staging テーブルに APPEND
print(f"DataFrame を Databricks テーブル `{self.databricks_staging_table}` に APPEND します。")
self._append_to_databricks_table(stream_df, self.databricks_staging_table)
# 5. Staging テーブルをもとに Target テーブルに MERGE を実行
print(f"MERGE を実行し、`{self.databricks_target_table}` を更新します。")
self._merge_into_target_table()
# 6. Snowflake 上の STREAM 実体化テーブルを TRUNCATE
print(f"Snowflake のテーブル `{self.snowflake_stream_table}` を TRUNCATE します。")
self._truncate_snowflake_table(self.snowflake_stream_table)
print("全ての処理が完了しました。")
def _check_snowflake_table_has_data(self, table_name: str) -> bool:
"""
指定した Snowflake テーブルにデータが存在するかチェックする。
Returns
-------
bool
True: データあり, False: 空
"""
df = self.spark.read \
.format("snowflake") \
.options(**self.snowflake_options) \
.option("dbtable", f"{self.snowflake_database}.{self.snowflake_schema}.{table_name}") \
.load()
return bool(df.head(1))
def _process_snowflake_stream_table(self):
"""
既存の Snowflake STREAM 実体化テーブルにデータが残っている場合の処理。
連携後、Snowflake テーブルを TRUNCATE します。
"""
df = self._read_snowflake_stream_table()
self._check_and_evolve_schema(df, self.databricks_staging_table)
self._append_to_databricks_table(df, self.databricks_staging_table)
self._merge_into_target_table()
self._truncate_snowflake_table(self.snowflake_stream_table)
def _ctas_stream_table(self):
"""
Snowflake 上で CTAS を実行し、STREAM を実体化する。
"""
query = f"""
CREATE OR REPLACE TABLE {self.snowflake_database}.{self.snowflake_schema}.{self.snowflake_stream_table} AS
SELECT *
FROM {self.snowflake_database}.{self.snowflake_schema}.{self.snowflake_stream_name}
"""
self._run_snowflake_query(query)
def _read_snowflake_stream_table(self) -> DataFrame:
"""
Snowflake の STREAM 実体化テーブルを Spark DataFrame として読み込む。
Returns
-------
DataFrame
読み込んだデータ
"""
df = self.spark.read \
.format("snowflake") \
.options(**self.snowflake_options) \
.option("dbtable", f"{self.snowflake_database}.{self.snowflake_schema}.{self.snowflake_stream_table}") \
.load()
return df
def _check_and_evolve_schema(self, source_df: DataFrame, dbx_table_name: str):
"""
Databricks 側テーブルと比較し、スキーマ差分がある場合は必要な変更を反映する。
Delta テーブルの場合は mergeSchema オプションにより自動拡張できるため、現状はスタブ実装です。
Parameters
----------
source_df : DataFrame
Snowflake から読み込んだデータ
dbx_table_name : str
対象の Databricks テーブル名
"""
if not self.spark.catalog.tableExists(dbx_table_name):
print(f"Databricks テーブル `{dbx_table_name}` は存在しないため、スキーマ進化は不要です。")
return
print(f"Databricks テーブル `{dbx_table_name}` のスキーマ進化チェックを実施します。")
# ※必要に応じ、source_df.schema と対象テーブルのスキーマを比較し、
# ALTER TABLE ADD COLUMNS 等の対応を実施してください。
pass
def _append_to_databricks_table(self, df: DataFrame, dbx_table_name: str):
"""
DataFrame を Databricks のテーブルに APPEND する。
Delta テーブルの場合、mergeSchema オプションを有効にして書き込む。
Parameters
----------
df : DataFrame
書き込み対象の DataFrame
dbx_table_name : str
対象の Databricks テーブル名
"""
df.write \
.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.saveAsTable(dbx_table_name)
def _merge_into_target_table(self):
"""
Databricks の Staging テーブルから Target テーブルに MERGE を実行する。
※実際の要件に合わせ、ON 条件やカラム名を適宜修正してください。
"""
merge_sql = f"""
MERGE INTO {self.databricks_target_table} AS T
USING {self.databricks_staging_table} AS S
ON T.id = S.id
WHEN MATCHED AND S.metadata$action = 'DELETE' THEN DELETE
WHEN MATCHED THEN
UPDATE SET
T.col1 = S.col1,
T.col2 = S.col2
WHEN NOT MATCHED THEN
INSERT (id, col1, col2)
VALUES (S.id, S.col1, S.col2)
"""
print("実行する MERGE SQL:")
print(merge_sql)
self.spark.sql(merge_sql)
def _truncate_snowflake_table(self, table_name: str):
"""
Snowflake 上のテーブルを TRUNCATE する。
"""
query = f"TRUNCATE TABLE {self.snowflake_database}.{self.snowflake_schema}.{table_name}"
self._run_snowflake_query(query)
def _run_snowflake_query(self, query: str):
"""
Python の snowflake.connector を利用し、Snowflake 上でクエリを実行する。
"""
conn = snowflake.connector.connect(
user=self.snowflake_user,
password=self.snowflake_password,
account=self.snowflake_account,
warehouse=self.snowflake_warehouse,
database=self.snowflake_database,
schema=self.snowflake_schema
)
try:
with conn.cursor() as cur:
cur.execute(query)
# 必要に応じて明示的な commit を実施
conn.commit()
finally:
conn.close()
# =============================================================================
# 利用例 (Databricks Notebook 等で実行する場合)
# =============================================================================
# SparkSession (spark) は既に生成済みと仮定
# Snowflake 接続情報の例
snowflake_options = {
"sfUrl": "xxxxxx.snowflakecomputing.com",
"sfUser": "YOUR_USER",
"sfPassword": "YOUR_PASSWORD",
"sfDatabase": "YOUR_DATABASE",
"sfSchema": "YOUR_SCHEMA",
"sfWarehouse": "YOUR_WAREHOUSE",
}
integration = SnowflakeToDatabricksIntegration(
spark=spark,
snowflake_options=snowflake_options,
snowflake_account="xxxxxx", # 例: abcde12345.east-us-2.azure
snowflake_user="YOUR_USER",
snowflake_password="YOUR_PASSWORD",
snowflake_database="YOUR_DATABASE",
snowflake_schema="YOUR_SCHEMA",
snowflake_warehouse="YOUR_WAREHOUSE",
snowflake_stream_name="YOUR_STREAM_NAME", # 既存の STREAM 名
snowflake_stream_table="YOUR_STREAM_TABLE", # CTAS で作成するテーブル名
databricks_target_table="your_db.target_table", # MERGE 先の本番テーブル
databricks_staging_table="your_db.staging_table" # 一時的に APPEND するステージングテーブル
)
# メイン処理実行
integration.run_integration()
関連情報
Snowflake にて利用する機能について
Snowflake では以下の機能を主に利用します。
- STREAM
STREAM については以下の検証記事を投稿しています。STREAM は厳密な CDC を実施できるわけではないものの、前回の連携以降に DELETE されたレコードを特定できるため有用です。 DML 操作ごとに保持されるデータパターンが異なるため、ダウンストリームで MERGE を行う際にはそれを考慮する必要があります。
Databricks にて利用する機能について
Databricks では以下の機能を主に利用します。
- 書き込み時のスキーマ展開(Schema Evolution)
- 型拡張(Type widening)