LoginSignup
2
0

More than 1 year has passed since last update.

Delta Live Tablesパイプラインの作成、実行、管理

Posted at

Create, run, and manage Delta Live Tables pipelines | Databricks on AWS [2022/8/1時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

UIやDelta Live Tables APIを用いて、Delta Live Tablesパイプラインの作成、実行、管理、モニタリングをすることができます。また、Databricksジョブのようなオーケストレーションツールを用いてパイプラインを実行することができます。本書では、UIを用いてDelta Live Tablesのタスクを実行することにフォーカスしています。APIの利用に関してはAPI guideをご覧ください。

最初のパイプラインを作成し実行するには、Delta Live Tablesクイックスタート をご覧ください。

パイプラインの作成

  1. 以下のいずれかを行います。

    • サイドバーのWorkflowsをクリックし、Delta Live Tablesタブをクリックし、をクリックします。Create Pipelineダイアログが表示されます。
    • サイドバーでCreateをクリックし、メニューからPipelineを選択します。
  2. Product EditionドロップダウンからDelta Live Tablesの製品エディションを選択します。
    製品エディションを用いることで、お使いのパイプラインの要件に基づいて最適な製品オプションを選択することができます。製品エディションをご覧ください。

  3. Pipeline Nameフィールドにパイプラインの名称を入力します。

  4. Notebook Librariesフィールドにパイプラインのクエリーを含むノートブックのパスを入力するか、ノートブックをブラウズするためにをクリックします。

  5. パイプラインに追加のノートブックを指定するには、Add notebook libraryボタンをクリックします。

    ノートブックは任意の順序で追加できます。Delta Live Tablesはパイプラインの処理グラフ構造を構成するために、自動でデータセットの依存関係を解析します。

  6. パイプラインで実行されるクラスターにSpark設定を追加するには、Add configurationボタンをクリックします。

  7. 検索、クエリーを行えるようにオプションとして、Targetフィールドにデータベース名を指定することができます。データセットの公開をご覧下さい。

  8. オプションとして、Storage Locationフィールドにパイプラインから出力されるデータの格納場所をDBFSからクラウドストレージパスを指定します。Storage Locationが空の場合、システムはデフォルトの格納場所を使用します。

  9. Pipeline ModeTriggeredContinuousを選択します。連続、トリガーパイプラインをご覧下さい。

  10. オプションとして、オートスケーリングの有効・無効、ワーカーノード数の設定などパイプラインのクラスターの設定を変更することができます。クラスターサイズの管理をご覧ください。

  11. オプションとして、パイプラインでPhotonランタイムを使用することができます。Use Photon Accelerationのチェックボックスをクリックしてください。

  12. オプションとして、このパイプラインのDelta Live Tablesランタイムバージョンを変更できます。Channelドロップダウンをクリックしてください。Delta Live Tablesのsettingschannelフィールドをご覧くださいさい。

  13. Createをクリックします。

オプションとして、パイプラインのJSON設定を参照、編集することができます。Create PipelineダイアログのJSONをクリックします。

パイプラインのアップデートのスタート

作成したパイプラインを実行するには、パイプラインのアップデートを開始します。

  1. サイドバーのWorkflowsをクリックし、Delta Live Tablesタブをクリックします。Pipelines一覧が表示されます。
  2. 以下のいずれかを行います。
    • パイプラインのアップデートをすぐにスタートするには、Actionsカラムのをクリックします。システムがパイプラインを起動しているというメッセージを返します。
    • パイプラインをスタートする前にオプションを確認するには、パイプライン名をクリックします。Pipeline detailsページが表示されます。

Pipeline detailsページでは以下のオプションが提供されます。

  • 開発モードとプロダクションモードを切り替えるために、ボタンを使用します。デフォルトではパイプラインは開発モードで実行されます。開発、プロダクションモードをご覧下さい。
  • オプションとして、パイプラインのアクセス権を設定することができます。Permissionsボタンをクリックします。Delta Live Tables access controlをご覧ください。
  • パイプラインの設定を参照、編集するにはSettingsボタンをクリックします。パイプライン設定の詳細については、Delta Live Tablesの設定をご覧ください。

Pipeline detailsページからパイプラインのアップデートをスタートするには、ボタンをクリックします。

例えば、新たな要件に基づいてクエリーを変更したり、新たなカラムを計算するバグを修正するなどのケースで、すでに取り込んだデータを再処理したい場合があるかもしれません。UIからフルリフレッシュの実行をDelta Live Tablesシステムに指示することで取り込み済みのデータを再処理することができます。フルリフレッシュを実行するには、Startボタンの隣のをクリックして、Full refresh allをクリックします。

アップデートあるいはフルリフレッシュをスタートした後に、システムがパイプラインが起動中である旨のメッセージを返します。

アップデートの起動に成功すると、Delta Live Tablesは以下を実行します。

  1. Delta Live Tablesシステムによって作成されたクラスター設定を用いてクラスターを起動します。また、カスタムのクラスター設定を指定することができます。
  2. 存在しないテーブルを作成し、スキーマが既存テーブルに適合していることを確認します。
  3. 最新のデータを用いてテーブルをアップデートします。
  4. アップデートが完了したらクラスターをシャットダウンします。

Pipeline detailsページの下部にあるイベントログを参照することでアップデートの進捗を追跡することができます。

ログエントリーの詳細を参照するにはエントリーをクリックします。Pipeline event log detailsポップアップが表示されます。ログ詳細を含むJSONドキュメントを参照するには、JSONタブをクリックします。

例えば、パフォーマンス、データ品質メトリクスを解析するために、イベントログへのクエリーする方法はDelta Live Tablesのイベントログをご覧ください。

パイプラインのアップデートが完了すると、選択したテーブルのみをリフレッシュするためのアップデートをスタートできるようになります。

選択したテーブルのパイプラインアップデートのスタート

パイプラインで選択したテーブルのみのデータを再処理したい場合があるかと思います。例えば、開発中に単一のテーブルのみを変更し、テストに要する時間を削減したい、あるいは、パイプラインのアップデートが失敗し、失敗したテーブルのみをリフレッシュしたいと考えるかもしれません。

選択したテーブルのみをリフレッシュするアップデートをスタートするには、Pipeline detailsページで以下のことを行います。

  1. Select tables for refreshをクリックします。Select tables for refreshダイアログが表示されます。
    Select tables for refreshボタンが表示されない場合、Pipeline detailsページには最新のアップデートが表示され、アップデートが完了していることを確認してください。最新のアップデートのDAGが表示されない場合、例えば、アップデートが失敗した場合にはSelect tables for refreshボタンは表示されません。

  2. リフレッシュするテーブルを選択するには、それぞれのテーブルをクリックします。選択したテーブルがハイライトされてラベルがつけられます。アップデート対象からテーブルを除外するには、再度テーブルをクリックします。

  3. Refresh selectionをクリックします。

    注意
    Refresh selectionボタンには、括弧内に選択したテーブルの数が表示されます。

選択されたテーブルにすでに取り込まれたデータを再処理するには、Refresh selectionボタンの隣のFull Refresh selectionボタンをクリックします。

失敗したテーブルのパイプラインのアップデートのスタート

パイプライングラフの1つ以上のテーブルのエラーによってパイプラインのアップデートが失敗すると、失敗したテーブルと後段の依存関係のアップデートを起動することができます。

注意
失敗したテーブルに依存していたとしても、除外されたテーブルはリフレッシュされません。

失敗したテーブルをアップデートするには、Pipeline detailsページでRefresh failed tablesをクリックします。

選択した失敗テーブルのみをアップデートするには、

  1. Refresh failed tablesボタンの隣のをクリックし、Select tables for refreshをクリックします。Select tables for refreshダイアログが表示されます。

  2. リフレッシュするテーブルを選択するには、それぞれのテーブルをクリックします。選択したテーブルはハイライトされ、ラベル付けされます。アップデートからテーブルを削除するには、再度テーブルをクリックします。

  3. Refresh selectionをクリックします。

    注意
    Refresh selectionボタンには、括弧内に選択したテーブルの数が表示されます。

選択したテーブルですでに取り込まれたデータを再処理するには、Refresh selectionボタンの隣のをクリックし、Full Refresh selectionをクリックします。

パイプライン詳細の参照

パイプラインのグラフ

パイプラインの起動に成功すると、パイプラインのグラフが表示されます。マウスあるいは、グラフパネルの隅にあるを使用して表示を調整します。

データ品質メトリクスのツールチップを表示するには、パイプライングラフのデータセットのデータ品質の値の上にマウスカーソルを移動します。

選択されたテーブルのみをリフレッシュするアップデートを実行する際、リフレッシュの対象ではないテーブルはパイプライングラフでExcludedトラベルがつきます。

パイプラインの詳細

Pipeline detailsパネルには、パイプライン、アップデートのID、アップデートのステータス、アップデートのタイプ、アップデートのランタイムを含むパイプラインに関する情報と、パイプラインの最新のアップデートに関する情報が表示されます。

また、Pipeline Detailsパネルには、パイプラインに設定されている計算資源のコスト、製品エディション、Databricksランタイムバージョン、チャネルを含むパイプラインの計算クラスターに関する情報を表示します。クラスターのSpark UIが新規タブに表示されますので、Spark UIボタンをクリックします。新規タブでクラスターログを開くには、Logsボタンをクリックします。新規タブでクラスターのメトリクスを開くには、Metricsボタンをクリックします。

Run asの値には、パイプラインのアップデートを実行するユーザーが表示されます。Run asユーザーはパイプラインのオーナーであり、パイプラインのアップデートはこのユーザーのアクセス権でアップデートが実行されます。run asユーザーを変更するには、Permissionsをクリックし、パイプラインのオーナーを変更します。

データセットの詳細

データセットのスキーマやデータ品質メトリクスを含むデータセットの詳細を表示するには、Graphビューでデータセットをクリックします。データセット詳細が表示されます。

新規ウィンドウでパイプラインノートブックを開くには、Pathの値をクリックします。

データセット詳細ビューを閉じて、Pipeline detailsに戻るにはをクリックします。

パイプラインアップデートの停止

パイプラインのアップデートを停止するには、をクリックします。

パイプラインのスケジュール

トリガーパイプラインを手動でスタート、あるいはDatabricksジョブを用いてパイプラインをスケジュール実行することができます。Delta Live TablesのUIで直接単一のパイプラインタスクを用いたジョブを作成するか、ジョブのUIでマルチタスクワークフローにパイプラインタスクを追加することで、処理をスケジューリングすることができます。

単一タスクのジョブを作成し、Delta Live TablesのUIでジョブをスケジューリングするには以下を行います。

  1. Schedule > Add a scheduleをクリックします。パイプラインが1つ以上のスケジュールジョブに含まれている場合、ScheduleボタンにはSchedule (5) というように既存のスケジュールの数が表示されます。
  2. Job nameフィールドにジョブの名前を入力します。
  3. ScheduleScheduledに設定します。
  4. 期間、開始時刻、タイムゾーンを指定します。
  5. パイプラインのスタート、成功、失敗時にアラートを受け取る1つ以上のメールアドレスを指定します。
  6. Createをクリックします。

Databricksジョブでマルチタスクワークフローを作成し、パイプラインタスクを追加するには以下を行います。

  1. ジョブのUIでジョブを作成し、Pipelineタスクを用いてジョブワークフローにあなたのパイプラインを追加します。
  2. ジョブのUIでジョブのスケジュールを作成します。

パイプラインのスケジュールを作成すると、以下のことが行えるようになります。

  • Delta Live TablesのUIで、スケジュール名、一時停止されているかどうか、最後の実行時刻、最後の実行のステータスを含むスケジュールのサマリーの参照。
  • ジョブあるいはパイプラインタスクの編集。
  • スケジュールの編集、スケジュールの停止及び再開。また、スケジュールを作成する際にManualを選択した場合、スケジュールは停止状態となります。
  • 手動によるジョブの実行、ジョブの実行詳細の参照。

パイプラインの参照

サイドバーのWorkflowsをクリックし、Delta Live Tablesタブをクリックします。定義されたすべてのパイプラインの一覧、最近のパイプラインアップデートのステータス、パイプラインのID、パイプラインの作成者を表示するPipelinesページが表示されます。

パイプライン一覧を以下の手段でフィルタリングすることができます。

  • パイプライン名
  • パイプライン名に部分一致するテキスト
  • あなたが所有しているパイプラインのみ
  • あなたがアクセスできるすべてのパイプライン

パイプラインの名前の昇順(A -> Z)、降順(Z -> A)でソートするには、Nameカラムヘッダーをクリックします。

パイプライン一覧を参照する際には、パイプライン名はリンクになっているので、パイプライン詳細を新規タブ、新規ウィンドウで開くためには右クリックしてコンテキストメニューを使用することができます。

設定の編集

パイプラインの設定を参照、編集するには、Pipeline detailsページでSettingsボタンをクリックします。設定を追加、変更、削除することができます。例えば、パイプラインを作成した後にパイプラインの出力をクエリーできるようにするには以下を行います。

  1. Settingsボタンをクリックします。Edit Pipeline Settingsダイアログが表示されます。
  2. Targetフィールドにデータベース名を入力します。
  3. Saveをクリックします。

JSONの定義を参照、編集するには、JSONボタンをクリックします。

設定の詳細については、Delta Live Tablesの設定をご覧下さい。

アップデート履歴の参照

パイプラインのアップデートの履歴とステータスを参照するには、Update historyドロップダウンをクリックします。

アップデートのグラフ、詳細、イベントを参照するには、ドロップダウンのアップデートを選択します。最新のアップデートに戻るには、Show the latest updateをクリックします。

データセットの公開

パイプラインを作成、編集する際、Databricksのメタストアにテーブル定義を公開し、レコードをDeltaテーブルに永続化するために設定targetを指定することができます。

アップデートが完了すると、データベースとテーブルを参照でき、データをクエリーし、後段のアプリケーションでデータを活用できるようになります。

詳細はDelta Live Tables data publishingを参照ください。

クラスターサイズの管理

パイプラインで使用するクラスターのリソースを管理することができます。デフォルトでは、Delta Live Tablesはパフォーマンスとコストを最適化するために、パイプラインのクラスターを自動でスケールします。クラスターのオートスケーリングを使用することをお勧めしますが、オプションとしてパイプラインを作成、編集する体に、オートスケーリングを無効化し、固定のワーカーノードの数を指定することができます。

  • パイプラインを作成する際にEnable autoscalingチェックボックスを無効化し、Workersフィールドにノード数を指定します。

  • 既存のパイプラインでオートスケーリングを無効化するために設定を変更します。以下のスニペットでは、クラスターのオートスケーリングが有効化される設定を示しています。

    JSON
    "clusters": [
       {
         "label": "default",
          "autoscale": {
            "min_workers": 1,
            "max_workers": 5
          }
       }
    ]
    

    以下のスニペットでは、オートスケーリングが無効化され、ワーカーノードの台数は5に固定されています。

    JSON
    "clusters": [
       {
         "label": "default",
         "num_workers": 5
       }
    ]
    

パイプラインの削除

Pipelines一覧、あるいはPipeline detailsページでパイプラインを削除することができます。

  • Pipelines一覧ではActionsカラムのをクリックします。
  • 対象のパイプラインのPipeline detailsページでは、Deleteボタンをクリックします。

パイプラインを削除することで、Delta Live Tablesのシステムからパイプライン定義が削除され、これを取り消すことはできません。

Databricks 無料トライアル

Databricks 無料トライアル

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0