1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Delta Live Tablesクックブック

Last updated at Posted at 2021-11-17

Delta Live Tables cookbook | Databricks on AWS [2021/10/21時点]の翻訳です。

注意
この機能はパブリックプレビューです。アクセスするためには、Delta Live Tablesへのアクセスをリクエストしてください。(訳者注:Databricks担当にご連絡ください)

本書では、Delta Live Tablesのパイプラインにおける一般的なタスクを実装するためのお勧めとソリューションを説明しています。

エクスペクテーションをポータブルかつ再利用可能にする

シナリオ

共通のデータ品質に関するルールを複数のテーブルに対して適用したい、あるいは、パイプライン開発者とは別のチームメンバーがデータ品質に関するルールを開発、メンテナンスしたいと考えています。

ソリューション

パイプラインの実装とは別にデータ品質ルールを管理します。DBFS(Databricksファイルシステム)やクラウドストレージ、Deltaテーブルなど信頼性があり、容易にアクセスできるフォーマットで、ルールを格納します。以下の例では、ルールを管理するためにrules.csvというCSVファイルをDBFSに格納しています。rules.csvのそれぞれのルールはタグで分類されます。どのルールを適用するのかを決定するために、データセット定義でこのタグを使用します。

name, constraint, tag
website_not_null,"Website IS NOT NULL",validity
location_not_null,"Location IS NOT NULL",validity
state_not_null,"State IS NOT NULL",validity
fresh_data,"to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",maintained
social_media_access,"NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",maintained

以下のPythonのサンプルでは、rules.csvに格納されたルールに基づいてデータ品質のエクスペクテーションを定義しています。get_rules()関数はrules.csvからルールを読み込み、関数に渡された引数tagにマッチするルールを含むディクショナリーを返却します。データ品質制約を適用するために、このディクショナリーは@dlt.expect_all_*()デコレーターで適用されます。例えば、validityタグがついているルールに失敗したレコードはraw_farmers_marketから削除されます。

Python
import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from csv file
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.format("csv").option("header", "true").load("/path/to/rules.csv")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

SQLでPython UDFを使う

シナリオ

Delta Live Tablesデータセットを定義するために、SQLのシンプルさを必要とするが、SQLでサポートされていない変換処理を行いたいと考えています。

ソリューション

お使いのSQLクエリーでPythonのユーザー定義関数(UDF)を使用します。以下の例では、入力引数の二乗を返却するUDFsquare()を定義、登録し、SQLでUDFsquare()をコールしています。

  1. UDFを定義、登録する。

    Default LanguagePythonに設定してノートブックを作成し、以下のコードをセルに追加します。

    Python
    def square(i: int) -> int:
        """
        Simple udf for squaring the parameter passed.
        :param i: column from Pyspark or SQL
        :return: squared value of the passed param.
        """
        return i * i
    
    spark.udf.register("makeItSquared", square) # register the square udf for Spark SQL
    
  2. UDFをコールします。
    SQLノートブックを作成し、以下のクエリーをセルに追加します。

    SQL

CREATE LIVE TABLE raw_squared
AS SELECT makeItSquared(2) AS numSquared;
```

  1. パイプランを作成します。
    新規Delta Live Tablesパイプラインを作成し、作成したノートブックをNotebook Librariesに追加します。Create Pipelineダイアログでノートブックを追加するのにAdd notebook libraryボタンを使用するか、ノートブックを設定するためにDelta Live Tablesのsettingslibrariesフィールドに追加します。

Delta Live TablesパイプラインでMLflowモデルを使う

シナリオ

パイプラインにおいて、MLflowのトレーニング済みモデルを使用したいと考えています。

ソリューション

Delta Live TablesパイプラインでMLflowモデルを使用するには、以下の手順を踏みます。

  1. MLflowモデルのrun IDとモデル名を取得します。run IDとモデル名は、MLflowモデルのURIを構築するために必要となります。
  2. MLflowモデルをロードするSpark UDFを定義するためにURIを使用します。
  3. MLflowモデルを使用するためにテーブル定義でUDFをコールします。

以下の例では、ローンリスクデータに対してトレーニングしたMLflowモデルをロードするloaded_modelというSpark UDFを定義しています。gtb_scoring_train_datagtb_scoring_valid_dataというテーブルを定義するためにUDFloaded_modelが使用されます。

Python
%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id= "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = "runs:/{run_id}/{model_name}".format(run_id=run_id, model_name=model_name)
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML scored training dataset based on Loan Risk",
  table_properties={
    "quality": "gold"
  }
)
def gtb_scoring_train_data():
  return dlt.read("train_data")
    .withColumn('predictions', loaded_model(struct(features)))

@dlt.table(
  comment="GBT ML scored valid dataset based on Loan Risk",
  table_properties={
    "quality": "gold"
  }
)
def gtb_scoring_valid_data():
  return dlt.read("valid_data")
    .withColumn('predictions', loaded_model(struct(features)))

開発用、テスト用のサンプルデータセットを作成する

シナリオ

データのサブセットや特定のレコードタイプなど、開発やテストのためのサンプルデータセットを作成したいと考えています。

ソリューション

単一あるいは共有ノートブックで変換ロジックを実装します。そして、環境に基づいて複数のデータセットを定義する別のノートブックを作成します。例えば、本格運用(production)においては、パイプラインのために完全なセットのデータを定義するノートブックを作成します。

SQL
CREATE INCREMENTAL LIVE TABLE input_data AS SELECT * FROM cloud_files("/production/data", "json")

次に、要件に基づいてサンプルのデータを定義するノートブックを作成します。例えば、テストのために特定のレコードを含む小規模なデータセットを生成します。

SQL
CREATE LIVE TABLE input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading

あるいは、開発、テストのために本格運用ようのデータのサブセットを作成することができます。

SQL
CREATE LIVE TABLE input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY

異なるデータセットを使用するには、変換ロジックを実装するノートブックを用いて複数のパイプラインを作成します。それぞれのパイプラインはLIVE.input_dataデータセットから読み込みを行いますが、環境固有のデータセットを作成するノートブックを含めるように設定されます。

複数のliveテーブルをプログラムから管理、作成する

シナリオ

少しのパラメーターだけが異なる複数のフロー、データセット定義を含むパイプラインがあるとします。この冗長性はエラーが混入しやすく、メンテナンスしにくいものとなります。例えば、以下の図では、異なるカテゴリーの緊急連絡に対する最速のレスポンスが可能となる近隣の消防署を検索するためのデータセットを使用するパイプラインのグラフを示しています。この例では、並列のフローにおいては、わずかなパラメーターのみが異なっています。

ソリューション

冗長なフロー定義を生成、管理するオーバーヘッドを削減するために、メタプログラミングパターンを使用することができます。Delta Live Tablesにおけるメタプログラミングは、Pythonの内部関数(inner function)を使うことで達成することができます。これらの関数は遅延評価されるので、入力パラメーター以外が同じフローを作成するために使用することができます。以下の例のように、それぞれの呼び出しには、それぞれのテーブルをどのように作成するのかを制御する異なるパラメーターセットを含めることができます。

Python
import dlt
from pyspark.sql.functions import *

@dlt.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dlt.expect_or_drop("valid_received", "received IS NOT NULL")
@dlt.expect_or_drop("valid_response", "responded IS NOT NULL")
@dlt.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
    .select('call_type', 'received', 'responded', 'neighborhood')
  )

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dlt.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return (
      spark.sql("""
        SELECT
          unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
          unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
          neighborhood
        FROM LIVE.raw_fire_department
        WHERE call_type = '{filter}'
      """.format(filter=filter))
    )

  @dlt.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time "
  )
  def create_response_table():
    return (
      spark.sql("""
        SELECT
          neighborhood,
          AVG((ts_received - ts_responded)) as response_time
        FROM LIVE.{call_table}
        GROUP BY 1
        ORDER BY response_time
        LIMIT 10
      """.format(call_table=call_table))
    )

  all_tables.append(response_table)

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dlt.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dlt.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x,y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

Databricks 無料トライアル

Databricks 無料トライアル

1
2
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?