LoginSignup
2
0

More than 1 year has passed since last update.

Deequでsparkのデータ品質チェックをやってみる

Posted at

はじめに

今回は、deequを使って、Databricks上のsparkでデータの品質チェックをやってみるというイージな記事です。

さまざまなデータを扱うにあたり、想定しているデータがちゃんとロードできているのか、予想外のデータが入ってきていないのかなど、想定していないデータが含まれることにより、システムに大きな影響を与えます。データ分析において、データ品質は非常に重要なものとなります。

deequとは

「このブログ記事では、Amazon で開発し、使用されているオープンソースツールである Deequ を紹介したいと思います。Deequ では、データセットのデータ品質メトリクスを計算したり、データ品質の制約を定義および確認したり、データ分布の変化について通知を受け取ったりすることができます。確認や検証のアルゴリズムを直接実装する代わりに、データの外観を記述することに集中できます。Deequ は確認を提案することでお客様をサポートします。Deequ は Apache Spark に実装されています。通常は分散型ファイルシステムまたはデータウェアハウスに存在する大規模なデータセット (数十億行の規模と考えられる) に合わせて拡張するように設計されています。」

とのことで、AWSで開発されたデータ品質を検証するためのツール的な位置付けになります。
詳細な情報は上記のリンクにわかりやすく書いていただいているので一読いただくのが良いかと思います。

DatabricksでDeequを使ってみる

Clusterへのjarアップロード

Deequを使うために、MavenからjarをDownloadして、Databricksにinstallします。
今回は、Databricks Runtime(10.4 sparl 3.2.1/Scala 2.12)を利用します。

まず、Mavenのrepoから、jarをDownloadします。

DatabricksのClusterのGUIから、Libralies -> Install newを選択して、Downloadしたjarをuploadします。

Screenshot 2022-09-05 at 16.54.06.jpg

installが完了すると、こんな形で表示されます。

Screenshot 2022-09-05 at 17.18.22.jpg

これで、使う準備は整いました。

Deequをやってみる

今回は、sample notebookをそのまま利用してやってみます。

Notebookはこちら

Notebookをimportしたら、いつもどおりClusterをアタッチします。
今回はscalaを使います。

まずはスキーマ定義をやります。

case class Item(
  id: Long,
  productName: String,
  description: String,
  priority: String,
  numViews: Long
)

Screenshot 2022-09-05 at 17.27.55.jpg

Dataframeを作ります。

val rdd = spark.sparkContext.parallelize(Seq(
  Item(1, "Thingy A", "awesome thing.", "high", 0),
  Item(2, "Thingy B", "available at http://thingb.com", null, 0),
  Item(3, null, null, "low", 5),
  Item(4, "Thingy D", "checkout https://thingd.ca", "low", 10),
  Item(5, "Thingy E", null, "high", 12)))

val data = spark.createDataFrame(rdd)

Screenshot 2022-09-05 at 17.28.54.jpg

データの中身はこんな感じです。

Screenshot 2022-09-05 at 17.29.51.jpg

ライブラリをimportします。

import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
import com.amazon.deequ.constraints.ConstraintStatus

Screenshot 2022-09-05 at 17.30.34.jpg

VerificatioSuite Classからチェック制約を書いていきます。

val verificationResult = VerificationSuite()
  .onData(data)
  .addCheck(
    Check(CheckLevel.Error, "unit testing my data")
      .hasSize(_ == 5) // we expect 5 rows
      .isComplete("id") // should never be NULL
      .isUnique("id") // should not contain duplicates
      .isComplete("productName") // should never be NULL
      // should only contain the values "high" and "low"
      .isContainedIn("priority", Array("high", "low"))
      .isNonNegative("numViews") // should not contain negative values
      // at least half of the descriptions should contain a url
      .containsURL("description", _ >= 0.5)
      // half of the items should have less than 10 views
      .hasApproxQuantile("numViews", 0.5, _ <= 10))
    .run()

Screenshot 2022-09-05 at 17.32.09.jpg

チェック制約については、.xxxxという形で記述していきます。テストコードで何をしているかというと、コメントに書いてあるとおりなのですが

Checkmethod 何をしてるか
.hasSize 期待しているデータ件数
.isComplete("id") Nullでないこと
.isUnique("id") IDがuniqueであること
.isComplete("productName") Nullでないこと
.isContainedIn("priority", Array("high", "low")) priority列は"high" または "low"が含まれていること
.isNonNegative("numViews") numViews列はマイナスの値でないこと
.containsURL("description", _ >= 0.5) description列にはURLが半分以上含まれていること
.hasApproxQuantile("numViews", 0.5, _ <= 10)) numview列の値の半分は10以下であること

チェックの制約で何が利用できるかは、先のAWSのDocumentか、sourceを見るのが良さそうです。

チェック制約の実行結果をみていきたいと思います。

if (verificationResult.status == CheckStatus.Success) {
  println("The data passed the test, everything is fine!")
} else {
  println("We found errors in the data:\n")

  val resultsForAllConstraints = verificationResult.checkResults
    .flatMap { case (_, checkResult) => checkResult.constraintResults }

  resultsForAllConstraints
    .filter { _.status != ConstraintStatus.Success }
    .foreach { result => println(s"${result.constraint}: ${result.message.get}") }
}

Screenshot 2022-09-05 at 17.50.22.jpg

CompletenessConstraint(Completeness(productName,None)): Value: 0.8 does not meet the constraint requirement!

Screenshot 2022-09-05 at 17.29.51.jpg

まずは、id3のProduct Nameがnullなので、.isComplete("productName")制約に引っ掛かっています。

containsURL(description): Value: 0.4 does not meet the constraint requirement!

.containsURL("description", _ >= 0.5)の半分はurlを含まないといけないので、エラーになっちゃいます。

なので、無理やりですが、、、DataFrameを修正して、テストが通るかやってみます。先ほどとほぼ同じコードです。

val rdd2 = spark.sparkContext.parallelize(Seq(
  Item(1, "Thingy A", "awesome thing. https://hogehoge.com", "high", 0),
  Item(2, "Thingy B", "available at http://thingb.com", null, 0),
  Item(3, "Thingy C", "hellow deequ at https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/VerificationSuite.scala", "low", 5),
  Item(4, "Thingy D", "checkout https://thingd.ca", "low", 10),
  Item(5, "Thingy E", "Hello databricks at https://databricks.com", "high", 12)))

val data2 = spark.createDataFrame(rdd2)
val verificationResult = VerificationSuite()
  .onData(data2)
  .addCheck(
    Check(CheckLevel.Error, "unit testing my data")
      .hasSize(_ == 5) // we expect 5 rows
      .isComplete("id") // should never be NULL
      .isUnique("id") // should not contain duplicates
      .isComplete("productName") // should never be NULL
      // should only contain the values "high" and "low"
      .isContainedIn("priority", Array("high", "low"))
      .isNonNegative("numViews") // should not contain negative values
      // at least half of the descriptions should contain a url
      .containsURL("description", _ >= 0.5)
      // half of the items should have less than 10 views
      .hasApproxQuantile("numViews", 0.5, _ <= 10))
    .run()
if (verificationResult.status == CheckStatus.Success) {
  println("The data passed the test, everything is fine!")
} else {
  println("We found errors in the data:\n")

  val resultsForAllConstraints = verificationResult.checkResults
    .flatMap { case (_, checkResult) => checkResult.constraintResults }

  resultsForAllConstraints
    .filter { _.status != ConstraintStatus.Success }
    .foreach { result => println(s"${result.constraint}: ${result.message.get}") }
}

当然ですが、、、通るようになりました。

Screenshot 2022-09-05 at 17.58.14.jpg

おまけ

AWSのDocumentで見つけたのですが、Analyzerを使うことで、指定したカラムのカウントや、平均値、条件を満たしているかどうかなどの情報を取得することができます。

import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct}

val analysisResult: AnalyzerContext = { AnalysisRunner
  // data to run the analysis on
  .onData(data2)
  // define analyzers that compute metrics
  .addAnalyzer(Size())
  .addAnalyzer(Completeness("id"))
  .addAnalyzer(ApproxCountDistinct("id"))
  .addAnalyzer(Mean("numviews"))
  // compute metrics
  .run()
}

Screenshot 2022-09-05 at 18.03.30.jpg

// retrieve successfully computed metrics as a Spark data frame
val metrics = successMetricsAsDataFrame(spark, analysisResult)
display(metrics)

Screenshot 2022-09-05 at 18.04.54.jpg

また、これはかなりいいなと思ったのですが、suggestionを使うことで、どのデータ制約を使えばいいのかをレコメンドしてくれます。

import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._ // for toDS method

// We ask deequ to compute constraint suggestions for us on the data
val suggestionResult = { ConstraintSuggestionRunner()
  // data to suggest constraints for
  .onData(data2)
  // default set of rules for constraint suggestion
  .addConstraintRules(Rules.DEFAULT)
  // run data profiling and constraint suggestion
  .run()
}

// We can now investigate the constraints that Deequ suggested.
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap { 
  case (column, suggestions) => 
    suggestions.map { constraint =>
      (column, constraint.description, constraint.codeForConstraint)
    } 
}.toSeq.toDS()

Screenshot 2022-09-05 at 18.07.35.jpg

Screenshot 2022-09-05 at 18.07.57.jpg

最後に

感覚的に書けるのと、sparkで使うことを想定しており、大規模なデータチェックにも使える感じかなと思っております。
結構便利なので、是非試してみましょう!

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0