概要
Databricks にて Amazon が開発している PyDeequ の動作確認を実施しましたので共有します。PyDeequ (python-deequ)の README に記載されているクイックスタートを Databricks の仕様に修正したコードで実行しました。
引用元:awslabs/python-deequ: Python API for Deequ (github.com)
クイックスタート on Databricks
PyDeequ に対応した Spark バージョンのクラスター(12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12)
)にて deequ (com.amazon.deequ:deequ:2.0.4-spark-3.3
)をインストール
ライブラリのインストール、および、SPARK_VERSION
変数の定義
%pip install pydeequ==1.2.0 -q
dbutils.library.restartPython()
import os
os.environ["SPARK_VERSION"] = spark.version
Set up a PySpark session
from pyspark.sql import SparkSession, Row
import pydeequ
# Databricks では SparkSession の定義が不要
# spark = (SparkSession
# .builder
# .config("spark.jars.packages", pydeequ.deequ_maven_coord)
# .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
# .getOrCreate())
df = spark.sparkContext.parallelize(
[
Row(a="foo", b=1, c=5),
Row(a="bar", b=2, c=6),
Row(a="baz", b=3, c=None),
]
).toDF()
df.display()
Analyzers
from pydeequ.analyzers import *
analysisResult = (
AnalysisRunner(spark)
.onData(df)
.addAnalyzer(Size())
.addAnalyzer(Completeness("b"))
.run()
)
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
# analysisResult_df.show()
analysisResult_df.display()
Profile
from pyspark.sql.types import StringType
# String 型のカラムを除外
df2 = df.select([column for column in df.columns if df.schema[column].dataType != StringType()])
from pydeequ.profiles import *
result = ColumnProfilerRunner(spark).onData(df2).run()
for col, profile in result.profiles.items():
print(profile)
Constraint Suggestions
from pydeequ.suggestions import *
suggestionResult = (
ConstraintSuggestionRunner(spark).onData(df).addConstraintRule(DEFAULT()).run()
)
# Constraint Suggestions in JSON format
print(suggestionResult)
Constraint Verification
from pydeequ.checks import *
from pydeequ.verification import *
check = Check(spark, CheckLevel.Warning, "Review Check")
checkResult = (
VerificationSuite(spark)
.onData(df)
.addCheck(
check.hasSize(lambda x: x >= 3)
.hasMin("b", lambda x: x == 0)
.isComplete("c")
.isUnique("a")
.isContainedIn("a", ["foo", "bar", "baz"])
.isNonNegative("b")
)
.run()
)
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
# checkResult_df.show()
checkResult_df.display()
Repository
from pydeequ.repository import *
from pydeequ.analyzers import *
metrics_file = FileSystemMetricsRepository.helper_metrics_file(spark, "metrics.json")
repository = FileSystemMetricsRepository(spark, metrics_file)
key_tags = {"tag": "pydeequ hello world"}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)
analysisResult = (
AnalysisRunner(spark)
.onData(df)
.addAnalyzer(ApproxCountDistinct("b"))
.useRepository(repository)
.saveOrAppendResult(resultKey)
.run()
)
result_metrep_df = (
repository.load()
.before(ResultKey.current_milli_time())
.forAnalyzers([ApproxCountDistinct("b")])
.getSuccessMetricsAsDataFrame()
)
result_metrep_df.display()
注意事項
1. pydeequ のインポート時のエラーへの対応方法
pydeequ のインポート時に次のようなエラーが発生する場合があります。
RuntimeError: SPARK_VERSION environment variable is required. Supported values are: dict_keys(['3.3', '3.2', '3.1', '3.0', '2.4'])
SPARK_VERSION
変数に spark のバージョンをセットすることで対応できます。
import os
os.environ["SPARK_VERSION"] = spark.version
Profile 時の StringColumnProfile エラーへの対応方法
KeyError: 'StringColumnProfile'
String 型のカラムがあるとエラーとなるため、そのカラムを除外することで対応できます。
from pyspark.sql.types import StringType
# String 型のカラムを除外
df2 = df.select([column for column in df.columns if df.schema[column].dataType != StringType()])