はじめに
今回は、deequを使って、Databricks上のsparkでデータの品質チェックをやってみるというイージな記事です。
さまざまなデータを扱うにあたり、想定しているデータがちゃんとロードできているのか、予想外のデータが入ってきていないのかなど、想定していないデータが含まれることにより、システムに大きな影響を与えます。データ分析において、データ品質は非常に重要なものとなります。
deequとは
「このブログ記事では、Amazon で開発し、使用されているオープンソースツールである Deequ を紹介したいと思います。Deequ では、データセットのデータ品質メトリクスを計算したり、データ品質の制約を定義および確認したり、データ分布の変化について通知を受け取ったりすることができます。確認や検証のアルゴリズムを直接実装する代わりに、データの外観を記述することに集中できます。Deequ は確認を提案することでお客様をサポートします。Deequ は Apache Spark に実装されています。通常は分散型ファイルシステムまたはデータウェアハウスに存在する大規模なデータセット (数十億行の規模と考えられる) に合わせて拡張するように設計されています。」
とのことで、AWSで開発されたデータ品質を検証するためのツール的な位置付けになります。
詳細な情報は上記のリンクにわかりやすく書いていただいているので一読いただくのが良いかと思います。
DatabricksでDeequを使ってみる
Clusterへのjarアップロード
Deequを使うために、MavenからjarをDownloadして、Databricksにinstallします。
今回は、Databricks Runtime(10.4 sparl 3.2.1/Scala 2.12)を利用します。
まず、Mavenのrepoから、jarをDownloadします。
DatabricksのClusterのGUIから、Libralies -> Install newを選択して、Downloadしたjarをuploadします。
`
installが完了すると、こんな形で表示されます。
これで、使う準備は整いました。
Deequをやってみる
今回は、sample notebookをそのまま利用してやってみます。
Notebookはこちら
Notebookをimportしたら、いつもどおりClusterをアタッチします。
今回はscalaを使います。
まずはスキーマ定義をやります。
case class Item(
id: Long,
productName: String,
description: String,
priority: String,
numViews: Long
)
Dataframeを作ります。
val rdd = spark.sparkContext.parallelize(Seq(
Item(1, "Thingy A", "awesome thing.", "high", 0),
Item(2, "Thingy B", "available at http://thingb.com", null, 0),
Item(3, null, null, "low", 5),
Item(4, "Thingy D", "checkout https://thingd.ca", "low", 10),
Item(5, "Thingy E", null, "high", 12)))
val data = spark.createDataFrame(rdd)
データの中身はこんな感じです。
ライブラリをimportします。
import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
import com.amazon.deequ.constraints.ConstraintStatus
VerificatioSuite Classからチェック制約を書いていきます。
val verificationResult = VerificationSuite()
.onData(data)
.addCheck(
Check(CheckLevel.Error, "unit testing my data")
.hasSize(_ == 5) // we expect 5 rows
.isComplete("id") // should never be NULL
.isUnique("id") // should not contain duplicates
.isComplete("productName") // should never be NULL
// should only contain the values "high" and "low"
.isContainedIn("priority", Array("high", "low"))
.isNonNegative("numViews") // should not contain negative values
// at least half of the descriptions should contain a url
.containsURL("description", _ >= 0.5)
// half of the items should have less than 10 views
.hasApproxQuantile("numViews", 0.5, _ <= 10))
.run()
チェック制約については、.xxxxという形で記述していきます。テストコードで何をしているかというと、コメントに書いてあるとおりなのですが
Checkmethod | 何をしてるか |
---|---|
.hasSize | 期待しているデータ件数 |
.isComplete("id") | Nullでないこと |
.isUnique("id") | IDがuniqueであること |
.isComplete("productName") | Nullでないこと |
.isContainedIn("priority", Array("high", "low")) | priority列は"high" または "low"が含まれていること |
.isNonNegative("numViews") | numViews列はマイナスの値でないこと |
.containsURL("description", _ >= 0.5) | description列にはURLが半分以上含まれていること |
.hasApproxQuantile("numViews", 0.5, _ <= 10)) | numview列の値の半分は10以下であること |
チェックの制約で何が利用できるかは、先のAWSのDocumentか、sourceを見るのが良さそうです。
チェック制約の実行結果をみていきたいと思います。
if (verificationResult.status == CheckStatus.Success) {
println("The data passed the test, everything is fine!")
} else {
println("We found errors in the data:\n")
val resultsForAllConstraints = verificationResult.checkResults
.flatMap { case (_, checkResult) => checkResult.constraintResults }
resultsForAllConstraints
.filter { _.status != ConstraintStatus.Success }
.foreach { result => println(s"${result.constraint}: ${result.message.get}") }
}
CompletenessConstraint(Completeness(productName,None)): Value: 0.8 does not meet the constraint requirement!
まずは、id3のProduct Nameがnullなので、.isComplete("productName")制約に引っ掛かっています。
containsURL(description): Value: 0.4 does not meet the constraint requirement!
.containsURL("description", _ >= 0.5)の半分はurlを含まないといけないので、エラーになっちゃいます。
なので、無理やりですが、、、DataFrameを修正して、テストが通るかやってみます。先ほどとほぼ同じコードです。
val rdd2 = spark.sparkContext.parallelize(Seq(
Item(1, "Thingy A", "awesome thing. https://hogehoge.com", "high", 0),
Item(2, "Thingy B", "available at http://thingb.com", null, 0),
Item(3, "Thingy C", "hellow deequ at https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/VerificationSuite.scala", "low", 5),
Item(4, "Thingy D", "checkout https://thingd.ca", "low", 10),
Item(5, "Thingy E", "Hello databricks at https://databricks.com", "high", 12)))
val data2 = spark.createDataFrame(rdd2)
val verificationResult = VerificationSuite()
.onData(data2)
.addCheck(
Check(CheckLevel.Error, "unit testing my data")
.hasSize(_ == 5) // we expect 5 rows
.isComplete("id") // should never be NULL
.isUnique("id") // should not contain duplicates
.isComplete("productName") // should never be NULL
// should only contain the values "high" and "low"
.isContainedIn("priority", Array("high", "low"))
.isNonNegative("numViews") // should not contain negative values
// at least half of the descriptions should contain a url
.containsURL("description", _ >= 0.5)
// half of the items should have less than 10 views
.hasApproxQuantile("numViews", 0.5, _ <= 10))
.run()
if (verificationResult.status == CheckStatus.Success) {
println("The data passed the test, everything is fine!")
} else {
println("We found errors in the data:\n")
val resultsForAllConstraints = verificationResult.checkResults
.flatMap { case (_, checkResult) => checkResult.constraintResults }
resultsForAllConstraints
.filter { _.status != ConstraintStatus.Success }
.foreach { result => println(s"${result.constraint}: ${result.message.get}") }
}
当然ですが、、、通るようになりました。
おまけ
AWSのDocumentで見つけたのですが、Analyzerを使うことで、指定したカラムのカウントや、平均値、条件を満たしているかどうかなどの情報を取得することができます。
import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct}
val analysisResult: AnalyzerContext = { AnalysisRunner
// data to run the analysis on
.onData(data2)
// define analyzers that compute metrics
.addAnalyzer(Size())
.addAnalyzer(Completeness("id"))
.addAnalyzer(ApproxCountDistinct("id"))
.addAnalyzer(Mean("numviews"))
// compute metrics
.run()
}
// retrieve successfully computed metrics as a Spark data frame
val metrics = successMetricsAsDataFrame(spark, analysisResult)
display(metrics)
また、これはかなりいいなと思ったのですが、suggestionを使うことで、どのデータ制約を使えばいいのかをレコメンドしてくれます。
import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._ // for toDS method
// We ask deequ to compute constraint suggestions for us on the data
val suggestionResult = { ConstraintSuggestionRunner()
// data to suggest constraints for
.onData(data2)
// default set of rules for constraint suggestion
.addConstraintRules(Rules.DEFAULT)
// run data profiling and constraint suggestion
.run()
}
// We can now investigate the constraints that Deequ suggested.
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap {
case (column, suggestions) =>
suggestions.map { constraint =>
(column, constraint.description, constraint.codeForConstraint)
}
}.toSeq.toDS()
最後に
感覚的に書けるのと、sparkで使うことを想定しており、大規模なデータチェックにも使える感じかなと思っております。
結構便利なので、是非試してみましょう!