4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks にて Amazon が開発している大規模なデータ品質をテスト可能な PyDeequ を動かしてみた

Posted at

概要

Databricks にて Amazon が開発している PyDeequ の動作確認を実施しましたので共有します。PyDeequ (python-deequ)の README に記載されているクイックスタートを Databricks の仕様に修正したコードで実行しました。

image.png

引用元:awslabs/python-deequ: Python API for Deequ (github.com)

クイックスタート on Databricks

PyDeequ に対応した Spark バージョンのクラスター(12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12))にて deequ (com.amazon.deequ:deequ:2.0.4-spark-3.3)をインストール

image.png

ライブラリのインストール、および、SPARK_VERSION変数の定義

%pip install pydeequ==1.2.0 -q
dbutils.library.restartPython()

image.png

import os
os.environ["SPARK_VERSION"] = spark.version

image.png

Set up a PySpark session

from pyspark.sql import SparkSession, Row
import pydeequ

# Databricks では SparkSession の定義が不要
# spark = (SparkSession
#     .builder
#     .config("spark.jars.packages", pydeequ.deequ_maven_coord)
#     .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
#     .getOrCreate())

df = spark.sparkContext.parallelize(
    [
        Row(a="foo", b=1, c=5),
        Row(a="bar", b=2, c=6),
        Row(a="baz", b=3, c=None),
    ]
).toDF()
df.display()

image.png

Analyzers

from pydeequ.analyzers import *

analysisResult = (
    AnalysisRunner(spark)
    .onData(df)
    .addAnalyzer(Size())
    .addAnalyzer(Completeness("b"))
    .run()
)

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
# analysisResult_df.show()
analysisResult_df.display()

image.png

Profile

from pyspark.sql.types import StringType

# String 型のカラムを除外
df2 = df.select([column for column in df.columns if df.schema[column].dataType != StringType()])
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark).onData(df2).run()

for col, profile in result.profiles.items():
    print(profile)

image.png

Constraint Suggestions

from pydeequ.suggestions import *

suggestionResult = (
    ConstraintSuggestionRunner(spark).onData(df).addConstraintRule(DEFAULT()).run()
)

# Constraint Suggestions in JSON format
print(suggestionResult)

image.png

Constraint Verification

from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = (
    VerificationSuite(spark)
    .onData(df)
    .addCheck(
        check.hasSize(lambda x: x >= 3)
        .hasMin("b", lambda x: x == 0)
        .isComplete("c")
        .isUnique("a")
        .isContainedIn("a", ["foo", "bar", "baz"])
        .isNonNegative("b")
    )
    .run()
)

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
# checkResult_df.show()
checkResult_df.display()

image.png

Repository

from pydeequ.repository import *
from pydeequ.analyzers import *

metrics_file = FileSystemMetricsRepository.helper_metrics_file(spark, "metrics.json")
repository = FileSystemMetricsRepository(spark, metrics_file)
key_tags = {"tag": "pydeequ hello world"}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

analysisResult = (
    AnalysisRunner(spark)
    .onData(df)
    .addAnalyzer(ApproxCountDistinct("b"))
    .useRepository(repository)
    .saveOrAppendResult(resultKey)
    .run()
)
result_metrep_df = (
    repository.load()
    .before(ResultKey.current_milli_time())
    .forAnalyzers([ApproxCountDistinct("b")])
    .getSuccessMetricsAsDataFrame()
)
result_metrep_df.display()

image.png

注意事項

1. pydeequ のインポート時のエラーへの対応方法

pydeequ のインポート時に次のようなエラーが発生する場合があります。

RuntimeError: SPARK_VERSION environment variable is required. Supported values are: dict_keys(['3.3', '3.2', '3.1', '3.0', '2.4'])

image.png

SPARK_VERSION変数に spark のバージョンをセットすることで対応できます。

import os
os.environ["SPARK_VERSION"] = spark.version

image.png

Profile 時の StringColumnProfile エラーへの対応方法

KeyError: 'StringColumnProfile'

image.png

String 型のカラムがあるとエラーとなるため、そのカラムを除外することで対応できます。

from pyspark.sql.types import StringType

# String 型のカラムを除外
df2 = df.select([column for column in df.columns if df.schema[column].dataType != StringType()])

image.png

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?