宣言型ETLを活用して信頼性が高く、運用が容易なデータパイプラインを構築できるDelta Live Tables(DLT)ですが、実際の運用で活用しようとすると色々な悩みに直面するかと思います。
例えば、複雑な処理を必要とするデータパイプラインを構築したい、データソースへのアクセスロジックを共通化したいというケースでは、ソースコードのモジュール化が重要となります。
しかしながら、DLTでソースコードのモジュール化をするには若干工夫が必要となります。
現時点では以下のアプローチがあります。
- **ReposのFiles in Repos**機能を用いて、pyファイルをimportする。
- DLTパイプラインを定義するノートブックを複数に分割する。
- SparkのUDF(ユーザー定義関数)としてロジックを定義する。
1の方法は以下の記事でも説明されていますが、本記事ではDLTの文脈での利用方法を説明します。
準備
Admin ConsoleでRepos配下のFiles in Reposを有効化します。
DatabricksのサイドバーからReposにアクセスして、リポジトリを作成しておきます。
ロジックの定義
-
DatabricksのサイドバーからReposにアクセスします。
-
上で作成したリポジトリにアクセスします。ここでは
project-test
というリポジトリとします。 -
ロジックを記述するpyファイルを作成します。
-
作成したいフォルダの右にある下向き矢印をクリックし、メニューからCreate > Fileを選択します。
-
ここではファイル名を
func.py
とします。 -
シンプルなロジックを記述しています。
Python
DLTパイプラインの定義
-
DLTパイプラインを定義するノートブックを作成しオープンします。
-
Reposへのパスを追加します。
Python
import sys
sys.path.append("/Workspace/Repos/takaaki.yayoi@databricks.com/project-test")
```
カスタムモジュールをimportします。
```py:Python
import func
```
- パイプラインのロジックから
.py
ファイルの関数を呼び出します。
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
@dlt.table(
comment="関数呼び出し元テーブル"
)
def caller():
func.test()
return (spark.read.json(json_path))
Delta Live Tablesの動作確認
上記パイプラインのロジックを記述しているノートブックを実行して、動作を確認します。