1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

このようなページができていたとは。

Delta Live Tablesとは

Delta Live Tables は、信頼性と保守性に優れた抽出、変換、読み込み (ETL) パイプラインの作成を簡略化するために設計された宣言型フレームワークです。 取り込むデータとその変換方法を指定すると、オーケストレーション、コンピュート管理、モニタリング、データ品質の適用、エラー処理など、データパイプラインの管理の主要な側面が Delta Live Tables 自動化されます。

エクスペクテーションとは

エクスペクテーションを定義することで、Delta Live Tablesパイプラインを流れるデータを検証する品質制約を適用します。エクスペクテーションに違反したレコードを削除、あるいはパイプラインの停止を行うことができます。

エクスペクテーションの可搬性の向上

そして、タイトルの話です。最初のページにはいくつかのエクスペクテーションのベストプラクティスがまとめられています。最初の例に膝を打ったので動かしてみます。

考え方は以下の通りです。

  • パイプラインロジックとエクスペクテーションの分離: エクスペクテーションの定義をパイプラインのロジックとは別に格納します。パイプラインの定義にエクスペクテーションを組み込むことは可能ですが、可搬性や再利用性を損なってしまいます。モジュール化することで複数のデータセットやパイプラインにエクスペクテーションを簡単に適用できます。
  • カスタムタグの適用: カスタムタグを追加して、関連するエクスペクテーションのグループを作成します。タグに基づいてエクスペクテーションをフィルタリングできるようになります。
  • 一貫したエクスペクテーションの適用: 類似したデータセットに一貫性のあるエクスペクテーションを適用します。複数のデータセットとパイプラインで同じエクスペクテーションを使用して、同一のロジックを評価します。

まず、エクスペクテーションの定義をPythonファイルに記載します。ディクショナリーで名前nameと制約constraint、カスタムタグtagを定義しています。

rules_module.py
def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

このPythonファイルと同じパスにパイプラインロジックを記述するノートブックを作成します。こちらの機能によって、パイプラインノートブックから上記rules_module.pyをPythonモジュールとしてインポートすることができます。

以下の肝はfrom rules_module import *と関数get_rulesです。get_rulesはタグを引数として、tagがマッチする制約条件をディクショナリーで返却します。これによって、以下にあるようにget_rules('validity')でタグvalidityに合致する制約条件をエクスペクテーションとしてテーブルに適用することができます。なるほど。二つ目のテーブルではタグmaintainedのエクスペクテーションを適用しています。

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col


def get_rules(tag):
  """
    データ品質ルールをテーブルから読み込む
    :param tag: 検索するタグ
    :return: タグに一致したルールの辞書
  """
  return {
    row['name']: row['constraint']
    for row in get_rules_as_list_of_dict()
    if row['tag'] == tag
  }

@dlt.table
@dlt.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table
@dlt.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
  )

このノートブックをソースコードとして設定してパイプラインとして実行すると、以下のように期待した通りにエクスペクテーションが適用されています。

Screenshot 2025-01-23 at 14.58.16.png
Screenshot 2025-01-23 at 14.58.22.png

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?