はじめに
DatabricksでスペシャリストSAをしている中里です。本記事では、Databricks Labsで公開されているマイグレーションツール Lakebridge と、私がコア開発者の一人として開発に携わったLLMベースのコード変換機能 Switch の概要から実装の裏側まで紹介します。
Lakebridgeは、様々なデータプラットフォームからDatabricksへのマイグレーションを支援するためのツールです。主にCLIで提供されており、ワークロード評価(Profiler)、コード分析(Analyzer)、コード変換(Transpiler)、データ照合(Reconciler)など、マイグレーションの各フェーズに必要な機能を搭載しています。
コード変換機能には複数の変換エンジンが用意されており、Switchはその中の一つとして、LLMを活用して多様なコード(SQL、Python、Scalaなど)をDatabricksノートブックに変換する新しいアプローチを提供します。従来の構文解析ベースの変換ツールとは異なり、LLMのセマンティック理解を活用することで、より柔軟で幅広い変換に対応しています。
本記事ではLakebridgeおよびSwitchの概要をかいつまんで紹介しますが、詳細は公式ドキュメントをご覧ください。
Lakebridge概要
Lakebridgeは複数の変換エンジン(Transpiler)を搭載しており、用途や要件に応じて使い分けることができます。
BladeBridge
BladeBridgeは、2025年2月にDatabricksが買収したテクノロジーをベースとした、ルールベースの変換エンジンです(買収に関するプレスリリース)。
- 対応ソース: Oracle、Teradata、MSSQL/Synapse、Netezza、Informatica PC/Cloud、DataStage
- 特徴: 設定ファイルによるカスタマイズが可能で、大規模なマイグレーションプロジェクトに対応
Morpheus
Morpheusは構文木(AST)ベースの変換エンジンで、高い正確性を重視した変換を行います。
- 対応ソース: MSSQL/Synapse、Snowflake(dbt repointingを含む)
- 特徴: 変換の正確性を保証できない場合は警告やエラーを出力し、確実性を重視
Switchとは
Switchは、Lakebridgeの3番目の変換エンジンとして2025年11月のv0.11.0から搭載されました。従来の構文解析ベースの変換エンジンとは異なり、LLMがコードの意図を理解することで、より柔軟で幅広い変換を実現しています。
特徴として、SwitchはDatabricksワークスペースの機能(ジョブ、サーバーレスコンピューティング、Mosaic AIモデルサービングエンドポイント、Deltaテーブル)を用いて動作します。出力形式はデフォルトでDatabricks Pythonノートブックですが、YAMLやJSONなどのテキストベースの形式もサポートしており、実験的ながらDatabricks SQLノートブックへの変換にも対応しています。
変換ソースについて、SwitchはLLMベースのため、カスタムプロンプトを指定すれば様々なソースに対応可能です。ビルトインプロンプトとしては、主要なSQL方言(MSSQL、MySQL、PostgreSQL、Oracle、Snowflake、Teradata、Redshift、Synapse、Netezzaなど)の他、Python、Scalaなどのプログラミング言語もサポートしています。
使い方
Databricks CLIがインストールされている前提で、まずLakebridgeをインストールします。
databricks labs install lakebridge
次に、LakebridgeにSwitchをインストールします。--include-llm-transpiler trueを指定することでSwitchがインストールされます。
databricks labs lakebridge install-transpile --include-llm-transpiler true
Switchのインストールが完了したら、以下のコマンドで変換を実行できます。これらは最小限のコマンドラインパラメーターです。より詳細な設定は設定ファイル (ワークスペース上のswitch_config.yml) で行えます。
databricks labs lakebridge llm-transpile \
--input-source /local/path \
--output-ws-folder /Workspace/path \
--source-dialect snowflake \
--accept-terms true
変換例
実際にどのような変換が行われるのか、T-SQLのストアドプロシージャをDatabricks Pythonノートブックに変換する例を見てみましょう。この例では、.sqlファイル(T-SQLストアドプロシージャ)を.pyファイル(Databricks Pythonノートブック)に変換しています。
入力: T-SQLストアドプロシージャ
CREATE PROCEDURE [dbo].[DEMO_FORECAST_OUTLIER_CHECK_UPDATE]
@SchemaName NVARCHAR(128),
@OutlierMultiplier DECIMAL(5,2) = 1.30
AS
BEGIN
DECLARE @Result INT = 0;
DECLARE @ErrorMsg NVARCHAR(MAX);
DECLARE @CurrentDate DATE;
-- 一時テーブルの作成
CREATE TABLE #TEMP_OUTLIER_INFO (
LocationId NVARCHAR(10),
OutlierThreshold DECIMAL(8,2)
);
BEGIN TRY
BEGIN TRAN FORECAST_OUTLIER_CHECK;
-- 1) 動的 SQL でシステム日付を取得
DECLARE @SQL NVARCHAR(MAX);
SET @SQL = N'
SELECT @CurrentDateOut = SystemDate
FROM ' + QUOTENAME(@SchemaName) + N'.SystemDateTable;
';
EXEC sp_executesql @SQL, N'@CurrentDateOut DATE OUTPUT',
@CurrentDateOut = @CurrentDate OUTPUT;
-- 2) 外れ値閾値を計算して一時テーブルに挿入
-- (省略)
-- 3) UPDATE with JOIN で外れ値を更新
SET @SQL = N'
UPDATE f
SET f.ForecastValue = t.OutlierThreshold * @Multiplier
FROM ' + QUOTENAME(@SchemaName) + N'.ForecastTable f
INNER JOIN #TEMP_OUTLIER_INFO t ON f.LocationId = t.LocationId
WHERE f.ForecastValue > t.OutlierThreshold * @Multiplier
';
EXEC sp_executesql @SQL, N'@Multiplier DECIMAL(5,2)',
@Multiplier = @OutlierMultiplier;
COMMIT TRAN FORECAST_OUTLIER_CHECK;
END TRY
BEGIN CATCH
SET @Result = 2;
IF (@@TRANCOUNT > 0) ROLLBACK TRAN FORECAST_OUTLIER_CHECK;
THROW;
END CATCH;
RETURN @Result;
END;
出力: Databricks Pythonノートブック
出力はDatabricks Pythonノートブック形式(.py)で生成されます。先頭の# Databricks notebook sourceはノートブック形式を示すコメントで、# COMMAND ----------はノートブックのセル分割を指定するコメントです。
# Databricks notebook source
# Create widgets for input parameters
dbutils.widgets.text("SchemaName", "")
dbutils.widgets.text("OutlierMultiplier", "1.30")
# COMMAND ----------
# パラメータ値を取得
schema_name = dbutils.widgets.get("SchemaName")
outlier_multiplier = float(dbutils.widgets.get("OutlierMultiplier"))
# COMMAND ----------
# 一時的なDeltaテーブルを作成して #TEMP_OUTLIER_INFO を置き換え
spark.sql("DROP TABLE IF EXISTS TEMP_OUTLIER_INFO")
spark.sql("""
CREATE TABLE TEMP_OUTLIER_INFO (
LocationId STRING,
OutlierThreshold DECIMAL(8,2)
)
""")
# COMMAND ----------
# ロールバックに備えてテーブルの状態を保存
try:
forecast_hist = spark.sql(
f"DESCRIBE HISTORY {schema_name}.ForecastTable LIMIT 1"
).collect()[0]
forecast_restore_ts = forecast_hist["timestamp"]
except:
forecast_restore_ts = None
# COMMAND ----------
try:
# システム日付テーブルから現在日付を取得
current_date = spark.sql(
f"SELECT SystemDate FROM {schema_name}.SystemDateTable"
).first()["SystemDate"]
# MERGE INTO を使用して予測値を更新
spark.sql(f"""
MERGE INTO {schema_name}.ForecastTable f
USING (
SELECT f.LocationId, t.OutlierThreshold * {outlier_multiplier} AS AdjustedValue
FROM {schema_name}.ForecastTable f
INNER JOIN TEMP_OUTLIER_INFO t ON f.LocationId = t.LocationId
WHERE f.ForecastValue > t.OutlierThreshold * {outlier_multiplier}
) src
ON f.LocationId = src.LocationId
WHEN MATCHED THEN UPDATE SET f.ForecastValue = src.AdjustedValue
""")
except Exception as e:
# Deltaテーブルのリストア機能を使用してロールバック
if forecast_restore_ts is not None:
spark.sql(f"RESTORE TABLE {schema_name}.ForecastTable TO TIMESTAMP AS OF '{forecast_restore_ts}'")
raise e
finally:
spark.sql("DROP TABLE IF EXISTS TEMP_OUTLIER_INFO")
# COMMAND ----------
# MAGIC %md
# MAGIC ## 静的構文チェック結果
# MAGIC 静的チェック中に構文エラーは検出されませんでした。
# MAGIC ただし、一部の問題は実行時にのみ検出される可能性があるため、コードを注意深く確認してください。
変換後のノートブックの最後には、静的構文チェックの結果がセクションとして追加されます。構文エラーが検出された場合はエラー情報が表示され、手動での修正が必要です。エラーがない場合でも、実行時にのみ検出される問題がある可能性があるため、コードのレビューを推奨しています。
変換のポイント
この変換では、T-SQL特有の構文をDatabricksで動作するPythonコードに変換しています。主な変換パターンは以下の通りです:
| T-SQLの構文 | Databricksでの変換 |
|---|---|
@Parameter |
dbutils.widgets |
#TEMP_TABLE |
Delta Table(明示的にDROP) |
BEGIN TRAN / ROLLBACK |
try-except + RESTORE TABLE
|
UPDATE ... FROM ... JOIN |
MERGE INTO |
NVARCHAR / VARCHAR
|
STRING |
sp_executesql |
f-stringによる動的SQL |
なお、一時テーブルやマルチステートメントトランザクションはDatabricksのロードマップに含まれており、これらの機能が利用可能になった際には、Switchのビルトインプロンプトも順次アップデートして、よりネイティブなDatabricksの機能を活用した変換を提供する予定です。
実装の工夫点
ここからは、Switchの実装において工夫した点を紹介します。これらの詳細はドキュメントには記載されていませんが、LakebridgeでSwitchをインストールするとワークスペース上にノートブック等のコードが配置されるので、中身を確認できます。
バッチ推論処理
LLMによる変換処理の参考にしたのは、以下のサンプルノートブックです。これをより堅牢に動かすために、いくつかの工夫をしています。
非同期バッチ推論
asyncioとhttpxを使用した非同期処理により、複数の変換リクエストを並行して処理します。ファイルごとに独立して並列実行されるため、あるファイルの変換が失敗しても他のファイルが共倒れになることはありません。並列度(concurrency)パラメータでエンドポイントへの同時リクエスト数を制御できます。
リトライ戦略
tenacityライブラリを使用し、指数バックオフによるリトライを実装しています。エラー種別に応じて戦略を変えており、バックプレッシャー(HTTP 429, 503)は最大20回、その他のエラーは最大5回リトライします。パラメータバリデーションエラー(HTTP 400)はリトライ不要のため即時停止します。
長い出力への対応
LLMの出力がトークン上限に達した場合、自動的に継続リクエストを送信して完全な出力を取得します。
# finish_reason が "length" の場合、継続リクエストを送信
if finish_reason != "length":
break
messages.append({"role": "assistant", "content": content})
messages.append({
"role": "user",
"content": f"The previous response ended with: '{content[-50:]}'. "
f"Please continue exactly from this point."
})
ビルトインプロンプト
プロンプトは「共通プロンプト」と「方言ごとのプロンプト」を分離してYAMLファイルで管理し、OmegaConfを使用して実行時に組み合わせています。方言ごとのプロンプトを作成するにあたっては、sqlglotなどのライブラリの実装を参考にしました。
共通プロンプト(抜粋)
common_python_instructions_and_guidelines: |
# Instructions
1. Convert the input SQL statements into Python code suitable for
execution in a Databricks notebook.
2. Strictly adhere to the guidelines provided below.
# Guidelines
## Spark SQL Execution:
- Use `spark.sql()` to execute SQL statements.
- Write SQL statements directly within `spark.sql()`.
## Transaction and Rollback Handling:
- Simulate transactions using Python's `try-except-finally` blocks.
- Use `RESTORE TABLE TIMESTAMP AS OF` for Delta table rollback.
## UPDATE Statements:
- Databricks does not support `FROM` clauses in `UPDATE`.
- Use `MERGE INTO` for statements that require a join.
方言ごとのプロンプト(T-SQLの例、抜粋)
system_message: |
Convert T-SQL code to Python code that runs on Databricks.
${common_python_instructions_and_guidelines}
# T-SQL Specific Guidelines
## Function Mapping:
- `GETDATE()` => `current_timestamp()`
- `ISNULL(expr, replacement)` => `nvl(expr, replacement)`
- `CONVERT(type, expr)` => `cast(expr AS type)`
## Data Type Mapping:
- `NVARCHAR` => `STRING`
- `UNIQUEIDENTIFIER` => `STRING`
- `MONEY` => `DECIMAL` or `DOUBLE`
few_shots:
- role: user
content: |
CREATE TABLE #RecentOrders (...);
SELECT TOP 10 * FROM #RecentOrders;
- role: assistant
content: |
spark.sql("CREATE OR REPLACE TABLE RecentOrders (...)")
spark.sql("SELECT * FROM RecentOrders LIMIT 10")
セル分割
変換後のPythonコードをノートブックのセルに分割する処理は、LLMを使用せず、完全にルールベースで実装しています。セル分割は構文的なタスクであり、LLMにセルの分割を任せるよりもAST解析による方が確実かつ高速に動作するためです。
class CellSplitHelper:
COMMAND_MARKER = "# COMMAND ----------"
def split_cells(self, code: str) -> str:
# 1. AST解析でトップレベルノードを抽出
tree = ast.parse(code)
top_level_nodes = self._get_top_level_nodes(tree)
# 2. 近接ノードをマージ
merged_blocks = self._merge_nearby_nodes(top_level_nodes)
# 3. コメント・空行を適切に処理
adjusted_blocks = self._adjust_block_start_lines(merged_blocks, lines)
# 4. COMMANDマーカーで分割
output_lines = self._extend_and_format_blocks(adjusted_blocks, lines)
return "\n".join(output_lines)
開発プロセス
Switchの開発は、ほぼVisual Studio CodeとClaude Codeで行いました。
ただし、いわゆる「Vibe Coding」ではなく、人間がナビゲーター、AIがドライバーという役割分担で、計画的に協働しながら開発を進めました。おおまかな流れは以下の通りです。
- Planモードで計画作成: タスクごとにPlanモードで実装計画をMarkdownで作成し、この段階で内容を徹底的に詰める
-
セッション管理をしながら実装:
/clearで新規セッションを作成し、計画に基づいて実装。キリの良いタイミングでコミットし、/compactまたは/clearでセッションをなるべくクリーンに保つ - Approval設定を使い分けてレビュー: 単純な実装についてはAuto Approvalを使用することもあったが、それなりに複雑なロジックを実装する際にはManual Approvalを使用して生成されたコードを逐一レビュー、早い段階で軌道修正を行う
また、品質担保のためにCI/CDパイプラインで以下を強制しています。
- lint(Pylint, Ruff)
- フォーマット(Black, isort)
- 高いテストカバレッジ
開発時のつぶやき
今後の展望
Switchは今後も機能拡充を予定しています:
- Spark Declarative Pipelineへの変換サポート: Delta Live Tables (DLT)の後継であるSpark Declarative Pipelineへの変換をサポート予定
- ビルトインプロンプトの拡充: より多くのSQL方言や変換パターンに対応
- エージェント型への進化: より自律的・高度な変換を行うエージェント型アーキテクチャを検討中
まとめ
本記事では、LakebridgeのLLMベース変換機能「Switch」の概要から実装の裏側まで紹介しました。
- Lakebridgeは複数の変換エンジンを搭載したマイグレーション支援ツール
- Switchは2025年11月のv0.11.0から搭載された3番目の変換エンジンで、LLMを活用した柔軟な変換を実現
- 非同期バッチ推論、リトライ戦略、AST解析によるセル分割など、堅牢な実装を目指した
- 開発はClaude Codeを活用し、人間がナビゲーター、AIがドライバーという役割分担で進めた
以上の記事が、LakebridgeおよびSwitchの理解に役立てば幸いです。