1
0

More than 1 year has passed since last update.

Delta Live Tables(DLT)でロジックをpyファイルにモジュール化して共通化する

Last updated at Posted at 2022-02-09

宣言型ETLを活用して信頼性が高く、運用が容易なデータパイプラインを構築できるDelta Live Tables(DLT)ですが、実際の運用で活用しようとすると色々な悩みに直面するかと思います。

例えば、複雑な処理を必要とするデータパイプラインを構築したい、データソースへのアクセスロジックを共通化したいというケースでは、ソースコードのモジュール化が重要となります。

しかしながら、DLTでソースコードのモジュール化をするには若干工夫が必要となります。

現時点では以下のアプローチがあります。

  1. ReposFiles in Repos機能を用いて、pyファイルをimportする。
  2. DLTパイプラインを定義するノートブックを複数に分割する。
  3. SparkのUDF(ユーザー定義関数)としてロジックを定義する。

1の方法は以下の記事でも説明されていますが、本記事ではDLTの文脈での利用方法を説明します。

準備

Admin ConsoleでRepos配下のFiles in Repos有効化します。
Screen Shot 2022-02-09 at 10.26.35.png

DatabricksのサイドバーからReposにアクセスして、リポジトリを作成しておきます。

ロジックの定義

  1. DatabricksのサイドバーからReposにアクセスします。
  2. 上で作成したリポジトリにアクセスします。ここではproject-testというリポジトリとします。
  3. ロジックを記述するpyファイルを作成します。
  4. 作成したいフォルダの右にある下向き矢印をクリックし、メニューからCreate > Fileを選択します。
  5. ここではファイル名をfunc.pyとします。
  6. シンプルなロジックを記述しています。

    Python
    def test():
    print("test")
    

    Screen Shot 2022-02-09 at 10.39.58.png

DLTパイプラインの定義

  1. DLTパイプラインを定義するノートブックを作成しオープンします。
  2. Reposへのパスを追加します。

    Python
    import sys
    sys.path.append("/Workspace/Repos/takaaki.yayoi@databricks.com/project-test")
    

    カスタムモジュールをimportします。

    Python
    import func
    
  3. パイプラインのロジックから.pyファイルの関数を呼び出します。

Python
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"

@dlt.table(
  comment="関数呼び出し元テーブル"
)

def caller():
  func.test()
  return (spark.read.json(json_path))

Delta Live Tablesの動作確認

上記パイプラインのロジックを記述しているノートブックを実行して、動作を確認します。
Screen Shot 2022-02-09 at 10.45.35.png

Databricks 無料トライアル

Databricks 無料トライアル

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0