1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Delta Live TablesでUDF(ユーザー定義関数)を活用して、複雑な処理を組み込んだり、複雑な条件でデータチェックを行う方法を説明します。

Delta Live Tablesとは

Delta Live TablesはDelta LakeをベースとしたETLパイプライン開発・運用のためのツールです。

PythonあるいはSQLにおいて、宣言型の文法でデータパイプラインを定義し、ジョブのように実行する形を取ります。詳細は以下のリンク先をご覧ください。

例えば、Pythonであれば、以下のように記述を行うことでデータパイプラインを定義することができます。

Python
from pyspark.sql.functions import *
from pyspark.sql.types import *


json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
@create_table(
  comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.",
  table_properties={
    "quality": "bronze"
  }
)
def clickstream_raw():
  return (
    spark.read.json(json_path)
  )


@create_table(
  comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
  table_properties={
    "quality": "silver"
  }
)
@expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
  return (
    read("clickstream_raw")
      .withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
      .withColumn("click_count", expr("CAST(n AS INT)"))
      .withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
      .withColumnRenamed("curr_title", "current_page_title")
      .withColumnRenamed("prev_title", "previous_page_title")
      .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
  )


@create_table(
  comment="A table of the most common pages that link to the Apache Spark page.",
  table_properties={
    "quality": "gold"
  }
)
def top_spark_referrers():
  return (
    read("clickstream_clean")
      .filter(expr("current_page_title == 'Apache_Spark'"))
      .withColumnRenamed("previous_page_title", "referrer")
      .sort(desc("click_count"))
      .select("referrer", "click_count")
      .limit(10)
  )


@create_table(
  comment="A list of the top 50 pages by number of clicks.",
  table_properties={
    "quality": "gold"
  }
)
def top_pages():
  return (
    read("clickstream_clean")
      .groupBy("current_page_title")
      .agg(sum("click_count").alias("total_clicks"))
      .sort(desc("total_clicks"))
      .limit(50)
  )

このノートブックをDelta Live Tablesのパイプラインとして登録し、実行すると以下のようなDAGが表示され処理が行われます。

Delta Live TablesにおけるUDFの活用

PythonやSQLを用いることで、柔軟にロジックを記述することはできるのですが、どうしても処理を共通化したい、SQLで表現できない条件をPythonで定義して呼び出したい、というようなニーズが出てきます。このような場合、UDF(User Defined Function)を定義することで、これらのニーズに応えることができます。

例えば、以下のようにPythonノートブック(呼び出し先)でUDFを定義します。最初の関数は計算を行うUDF、後者は判定を行うUDFとなります。UDFを登録する際には、戻り値の型を指定していることに注意してください。

python-callee
from pyspark.sql.types import IntegerType

def square(i: int) -> int:
    """
    指定されたパラメーターを二乗するシンプルなUDF
    :param i: Pyspark/SQLのカラム
    :return: パラメーターの二乗
    """
    return i * i

spark.udf.register("makeItSquared", square, IntegerType()) # Spark SQLで使用する二乗計算udfを登録

from pyspark.sql.types import BooleanType

def is_even(i: int) -> bool:
    """
    偶数かどうかを判定するシンプルなUDF
    :param i: Pyspark/SQLのカラム
    :return: 偶数の場合True
    """
    
    if i % 2 == 0:
      return True
    else:
      return False
    
spark.udf.register("passOnlyEven", is_even, BooleanType()) # Spark SQLで使用する偶数判定udfを登録

そして、UDFの呼び出しを行うパイプライン(呼び出し元)を定義します。

dlt-python-caller
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

@dlt.table
def raw_data():
  return spark.sql("SELECT id, makeItSquared(id) AS numSquared FROM RANGE(10)")

@dlt.table
@dlt.expect_or_drop("even_only", "passOnlyEven(numSquared) = True")
def squared_even():
  return (
    dlt.read("raw_data")
  )

こちらはSQLバージョンです。

sql-caller
CREATE LIVE TABLE raw_data
AS SELECT id, makeItSquared(id) AS numSquared FROM RANGE(10);

CREATE LIVE TABLE squared_even
(CONSTRAINT even_only EXPECT (passOnlyEven(numSquared) = True) ON VIOLATION DROP ROW)
AS SELECT id, makeItSquared(id) AS numSquared FROM live.raw_data

今回のケースでは、本データのidを二乗したものをnumSquaredとし、numSquaredが偶数のレコードのみを通過させるというパイプラインを構築しています。

パイプラインを作成する際には、呼び出し先と呼び出し元のノートブックの両方をNotebook Librariesに追加する必要があります。
Screen Shot 2021-12-15 at 18.04.12.png

このようにすることで、UDFの柔軟性、モジュール性を活用して複雑なパイプラインを構築することが可能となります。
Screen Shot 2021-12-15 at 18.09.03.png

パイプラインの終端のテーブルを確認すると、期待通りのデータになっていることがわかります。
Screen Shot 2021-12-15 at 18.10.25.png

サンプルノートブック

Databricks 無料トライアル

Databricks 無料トライアル

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?