1
1

More than 1 year has passed since last update.

Delta Live Tablesの設定

Last updated at Posted at 2022-01-13

Delta Live Tables settings | Databricks on AWS [2023/2/24時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Delta Live Tablesの設定では、パイプラインを実装する一つ以上のノートブック、開発、ステージング、プロダクション環境でパイプラインをどのように実行するのかを設定するパラメーターを指定します。Delta Live Tablesの設定はJSONで表現され、Delta Live TablesのUIで編集することができます。

設定

フィールド
id
Type:string
グローバルでユニークなパイプラインのID。IDはシステムによって割り当てられ変更することはできません。
name
Type:string
わかりやすいパイプラインの名称。UIでパイプラインジョブを識別するために使用されます。
storage
Type:string
パイプラインの実行に必要な出力データ、メタデータを格納するDBFSあるいはクラウドストレージ。
storageが設定されない場合、システムがデフォルトのdbfs:/pipelines/を使用します。
パイプラインの作成後にstorageを変更することはできません。
configuration
Type:object
パイプラインを実行するクラスターのSpark設定を追加するためのオプションの設定リストです。これらの設定はDelta Live Tablesランタイムに読み込まれ、Spark設定を通じてパイプラインクエリーで使用されます。
各々の要素はkey:valueの形式である必要があります。
configurationの使用例については、パイプラインのパラメータ化を参照ください。
libraries
Type:array of objects
パイプラインのコードと必要とされるアーティファクトを含むノートブックの配列を指定します。サンプルはパイプラインの複数のノートブックの設定を参照ください。
clusters
Type:array of objects
パイプラインを実行するクラスターの設定の配列です。詳細はクラスター設定を参照ください。
development
Type:boolean
パイプラインをdevelopmentあるいはproductionモードで実行するかを示すフラグです。開発、プロダクションモードをご覧ください。
continuous
Type:boolean
パイプラインを連続で稼働させるかどうかを示すフラグです。
デフォルトはfalseです。
target
Type:string
パイプラインの出力データを永続化するデータベース名です。targetを設定する事で、DatabricksのUIからパイプラインの出力データを参照、検索することが可能となります。
channel
Type:string
使用するDelta Live Tablesランタイムのバージョンです。サポートされる値は:
  • ランタイムバージョンに間も無く加えられる変更をパイプラインでテストするにはpreviewを指定します。
  • 現在のランタイムバージョンを指定するにはcurrentを指定します。
channelフィールドはオプションです。デフォルトはcurrentです。プロダクションのワークロードではcurrentバージョンのランタイムを使用することをお勧めします。
edition
Type:string
パイプラインを実行するDelta Live Tablesの製品エディションです。この設定によって、パイプラインの要件に応じたベストな製品エディションを選択することができます。
  • ストリーミング取り込みワークロードを実行するにはCOREを使用します。
  • ストリーミング取り込みとチェンジデータキャプチャ(CDC)ワークロードを実行するにはPROを使用します。
  • ストリーミング取り込みとチェンジデータキャプチャ(CDC)ワークロード、データ品質制約を強制するためのDelta Live Tablesエクスペクテーションを使用するにはADVANCEDを使用します。
editionフィールドはオプションです。デフォルトはADVANCEDです。
photon
Type:boolean
パイプラインを実行するためにPhotonランタイムを使用するかどうかを指定するフラグです。PhotonはDatabricksの高性能Sparkエンジンです。Photonが有効化されたパイプラインの課金と、有効化されていないパイプラインの課金は異なります。
photonフィールドはオプションです。デフォルトはfalseです。
pipelines.maxFlowRetryAttempts
Type:int
リトライ可能な障害が発生した際、パイプラインのアップデートが失敗する前にフローのリトライを試みる最大数です。
デフォルト値は2です。デフォルトでは、リトライ可能な障害が発生した場合、Delta Live Tablesランタイムは、初回の試行を含み3回フローの実行を試みます。
pipelines.numUpdateRetryAttempts
Type:int
リトライ可能な障害が発生した際、パイプラインのアップデートが失敗する前にアップデートのリトライを試みる最大数です。リトライはフルアップデートとして実行されます。
デフォルト値は5です。このパラメーターはproductionモードのアップデート実行の際にのみ適用されます。パイプライン実行がdevelopmentモードの場合にはリトライは行われません。

パイプラインのトリガー間隔

テーブルをアップデートするフロー、あるいはパイプライン全体のトリガー間隔をコントロールするために、pipelines.trigger.intervalを活用することができます。起動されるパイプラインはそれぞれのテーブルを一度のみ処理するので、pipelines.trigger.interval連続パイプラインの場合にのみ使用されます。

ストリーミングクエリーとバッチクエリーとでは異なるデフォルト値があるため、個々のテーブルでpipelines.trigger.intervalを設定することをお勧めします。処理においてパイプラインのグラフ全体のアップデートをコントロールする必要がある場合にのみ、パイプラインにこの値を設定してください。

Pythonのspark_conf、SQLのSETを用いてテーブルにpipelines.trigger.intervalを設定します。

Python
@dlt.table(
  spark_conf={"pipelines.trigger.interval" : "10 seconds"}
)
def <function-name>():
    return (<query>)
SQL
SET pipelines.trigger.interval='10 seconds';

CREATE OR REFRESH LIVE TABLE TABLE_NAME
AS SELECT ...

パイプラインにpipelines.trigger.intervalを設定するには、パイプライン設定のconfigurationオブジェクトに追加します。

JSON
{
  "configuration": {
    "pipelines.trigger.interval": "10 seconds"
  }
}
pipelines.trigger.interval
デフォルト値はフロータイプによって変わります:値は数値と時間単位となります。以下の例は適切な時間単位です:
  • second, seconds
  • minute, minutes
  • hour, hours
  • day, days
以下のように値を定義する際に単数形、複数形を使用することができます。
  • {"pipelines.trigger.interval" : "1 hour"}
  • {"pipelines.trigger.interval" : "10 seconds"}
  • {"pipelines.trigger.interval" : "30 second"}
  • "pipelines.trigger.interval" : "1 minute"}
  • {"pipelines.trigger.interval" : "10 minutes"}
  • {"pipelines.trigger.interval" : "10 minute"}

クラスター設定

Create cluster APIと同じJSONフォーマットを用いて、パイプラインで使用されるクラスターを設定することができます。二種類のクラスタータイプに対する設定を行うことができます。全ての処理が行われるdefaultクラスターと、日次のメンテナンスタスクを実行するmaintenanceクラスターです。それぞれのクラスターはlabelフィールドで識別することができます。

クラスターのプロパティの指定はオプションであり、指定されない値に関してはシステムはデフォルト値を使用します。

注意

  • クラスター設定でSparkのバージョンを指定することはできません。Delta Live Tablesのクラスターは、最新機能を取り込むように定期的にアップデートされるDatabricksランタイムのカスタムバージョンで動作します。マニュアルでバージョンを設定するとパイプラインが失敗する場合があります。

  • Delta Live Tablesのクラスターは使用されていない時に自動でシャットダウンされるため、クラスター設定のautotermination_minutesを設定するクラスターポリシーの参照はエラーとなります。クラスターのシャットダウンの挙動を制御するために、developmentモードかproductionモードを使用するか、Spark設定pipelines.clusterShutdown.delayを使用することができます。以下の例では、pipelines.clusterShutdown.delayを5秒に設定しています。

    JSON
    {
      "configuration": {
      "pipelines.clusterShutdown.delay": "60s"
    }
    

    productionモードが有効化されている場合、pipelines.clusterShutdown.delayのデフォルト値は0 secondsです。developmentモードが有効化されている場合、デフォルト値は2 hoursになります。

  • クラスター設定でnum_workersを0に設定すると、クラスターはシングルノードクラスターとして作成されます。オートスケーリングクラスターを設定し、min_workersを0、max_workersを0に設定してもシングルノードクラスターが作成されます。

    オートスケーリングクラスターを設定し、min_workersのみを0に設定すると、クラスターはシングルノードクラスターとしては作成されません。このクラスターは停止されるまで常に1台のワーカーが存在することになります。

    Delta Live Tablesでシングルノードクラスターを作成するクラスター設定のサンプルです。

    JSON
    {
      "clusters": [
        {
          "label": "default",
          "num_workers": 0
        }
      ]
    }
    

注意
お使いのストレージにアクセスするためにインスタンスプロファイルや他の設定が必要な場合には、defaultクラスター、maintenanceクラスターの両方に設定してください。

defaultクラスター、maintenanceクラスターの設定例を以下に示します。

JSON
{
  "clusters": [
    {
      "label": "default",
      "node_type_id": "c5.4xlarge",
      "driver_node_type_id": "c5.4xlarge",
      "num_workers": 20,
      "spark_conf": {
        "spark.databricks.io.parquet.nativeReader.enabled": "false"
      },
      "aws_attributes": {
        "instance_profile_arn": "arn:aws:..."
      }
    },
    {
      "label": "maintenance",
      "aws_attributes": {
        "instance_profile_arn": "arn:aws:..."
      }
    }
  ]
}

クラウドストレージの設定

AWSのS3ストレージへのアクセスを設定するには、AWSインスタンスプロファイルを使用します。Delta Live Tables APIあるいはDelta Live Tables UIでパイプラインを作成編集する際にインスタンスプロファイルの設定を追加することができます。また、パイプラインクラスターのJSON設定を直接編集することでAWSインスタンスプロファイルを設定することができます。

  1. パイプラインのPipeline detailsページで、Settingsボタンをクリックします。Pipeline settingsページが表示されます。
  2. JSONボタンをクリックします。
  3. クラスター設定のaws_attributes.instance_profile_arnフィールドに、インスタンスプロファイル設定を入力します。
JSON
{
  "clusters": [
    {
      "label": "default",
      "aws_attributes": {
        "instance_profile_arn": "arn:aws:..."
      }
    },
    {
      "label": "maintenance",
      "aws_attributes": {
        "instance_profile_arn": "arn:aws:..."
      }
    }
  ]
}

defaultとmaintenanceクラスターに対してインスタンスプロファイルを指定する必要があります。

Delta Live Tablesパイプラインのクラスターポリシーを作成する際に、インスタンスプロファイルを設定することもできます。サンプルに関しては、knowledge baseをご覧ください。

クラスターポリシー

注意
Delta Live Tablesクラスターを設定するためにクラスターポリシーを使う際には、defaultmaintenanceクラスターの両方で単一のポリシーを適用することをお勧めします。

パイプラインクラスターでクラスターポリシーを設定するには、cluster_typeフィールドをdltに設定したポリシーを作成します。

JSON
{
  "cluster_type": {
    "type": "fixed",
    "value": "dlt"
  }
}

パイプライン設定で、clusterのpolicy_idフィールドをポリシーIDの値に設定します。以下の例では、IDC65B864F02000008のクラスターポリシーを用いてdefaultmaintenanceクラスターを設定します。

JSON
{
  "clusters": [
    {
      "label": "default",
      "policy_id": "C65B864F02000008",
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    },
    {
      "label": "maintenance",
      "policy_id": "C65B864F02000008"
    }
  ]
}

クラスターポリシーの作成、使用のサンプルについては、Define limits on pipeline clustersをご覧ください。

サンプル

パイプラインとクラスターの設定

以下の例では、ストレージとしてのDBFS、小規模の1ノードのクラスターを使用して、example-notebook_1で実装されたパイプラインを実行するトリガーモードのパイプラインを設定しています。

JSON
{
  "name": "Example pipeline 1",
  "storage": "dbfs:/pipeline-examples/storage-location/example1",
  "clusters": [
    {
      "num_workers": 1,
      "spark_conf": {}
    }
  ],
  "libraries": [
    {
      "notebook": {
         "path": "/Users/user@databricks.com/example_notebook_1"
      }
    }
  ],
  "continuous": false
}

パイプラインの複数のノートブックの設定

複数のノートブックを用いたパイプラインを設定するにはlibrariesフィールドを使用します。Delta Live Tablesは、パイプラインに対する処理グラフを構築するためにデータセットの依存関係を自動で解析するので、ノートブックの順序は問いません。以下の例では、example-notebook_1example-notebook_2で定義されるデータセットを含むパイプラインを作成します。

JSON
{
  "name": "Example pipeline 3",
  "storage": "dbfs:/pipeline-examples/storage-location/example3",
  "libraries": [
    { "notebook": { "path": "/example-notebook_1" } },
    { "notebook": { "path": "/example-notebook_2" } }
  ]
}

ワークスペースファイルからコードをインポート

パイプラインコードを含むノートブックを設定するためにlibrariesフィールドを用いることに加え、Databricks repoに格納されているソースコードに含まれるパイプラインコードを設定することができます。パイプラインコードを含むソースファイルへのパスを指定するには、fileフィールドを使用します。

JSON
{
   "libraries": [
     { "file": { "path": "/Repos/<user_name>@databricks.com/Apply_Changes_Into/apply_changes_into.sql" } },
     { "file": { "path": "/Repos/<user_name>@databricks.com/Apply_Changes_Into/apply_changes_into.py" } }
   ]
}

Delta Live Tablesによる開発ワークフローの作成

開発、ステージング、プロダクションに対応する別々のDelta Live Tablesパイプラインを作成することができるので、作成するデータのコンシューマーに影響を与えることなしに、変換ロジックをテスト、デバッグすることができます。異なるデータベースをターゲットとしつつも同じコードを使用する別々のパイプラインを作成するだけです。

完全に分離された開発環境と、開発環境からプロダクション環境にプッシュするシンプルなワークフローを構築するためにDatabricksのReposの機能と組み合わせることができます。

JSON
{
  "name": "Data Ingest - DEV user@databricks",
  "target": "customers_dev_user",
  "libraries": ["/Repos/user@databricks.com/ingestion/etl.py"],
}
JSON
{
  "name": "Data Ingest - PROD",
  "target": "customers",
  "libraries": ["/Repos/production/ingestion/etl.py"],
}

パイプラインのパラメータ化

データセットを定義するPython、SQLコードはパイプラインの設定でパラメータ化することができます。パラメータ化を用いることで以下のユースケースを実現することができます。

  • コードと長いパスや変数の分離
  • 開発、ステージング環境で処理するデータ量を削減し、テストを高速化
  • 複数のデータソースに対する処理で同じ変換ロジックを再利用

以下の例では、開発用パイプラインで使用するデータを入力データのサブセットに限定するために、設定値startDateを使用しています。

SQL
CREATE LIVE TABLE customer_events
AS SELECT * FROM sourceTable WHERE date > ${mypipeline.startDate};
Python
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
JSON
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
JSON
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Databricks 無料トライアル

Databricks 無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1