2

I have a requirement to run Data Quality Test So I am using Amazon Deequ for this. I am able to find the Data Quality Success/Failure Status using below code, but next I want to get all the rows which was failed in check and Store into another DataFrame/Hive Table. Please help on that how can I get it. Also can we perform Amazon Deequ on multiple Dataset at same time? Below is the Running Code and need help to get the code for storing bad failed records.

import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
import com.amazon.deequ.constraints.ConstraintStatus

object Test extends App {

val spark = SparkSession.builder()
      .master("local[*]")
      .appName("amazon-deequ-test")
      .getOrCreate();

 val data = Seq((1, "Thingy A", "awesome thing.", "high", 0),
      (2, "Thingy B", "available at http://thingb.com", null, 0),
      (3, null, null, "low", 5),
      (4, "Thingy D", "checkout https://thingd.ca", "low", -10),
      (5, "Thingy E", null, "high", 12))

val cols = Seq("id", "productName", "description", "priority", "numViews")
val data = spark.createDataframe(data).toDF(cols: _*)
data.show(false)

val verificationResult: verificationResult = VerificationSuite() {
VerificationSuite()
      .onData(data)
      .addCheck(
        Check(CheckLevel.Error, "integrity checks")
          // we expect 5 records
          .hasSize(_ == 5)
          // 'id' should never be NULL
          .isComplete("id")
          // 'id' should not contain duplicates
          .isUnique("id")
          // 'productName' should never be NULL
          .isComplete("productName")
          // 'priority' should only contain the values "high" and "low"
          .isContainedIn("priority", Array("high", "low"))
          // 'numViews' should not contain negative values
          .isNonNegative("numViews"))
      .addCheck(
        Check(CheckLevel.Warning, "distribution checks")
          // at least half of the 'description's should contain a url
          .containsURL("description", _ >= 0.5)
          // half of the items should have less than 10 'numViews'
          .hasApproxQuantile("numViews", 0.5, _ <= 10))
      .run()

}

val resultDataFrame = checkResultAsDataFrame(spark, verificationResult).show(false)

}

1 Answers1

1

Note: Picked from deequ homepage:

import com.amazon.deequ.constraints.ConstraintStatus


if (verificationResult.status == CheckStatus.Success) {
  println("The data passed the test, everything is fine!")
} else {
  println("We found errors in the data:\n")

  val resultsForAllConstraints = verificationResult.checkResults
    .flatMap { case (_, checkResult) => checkResult.constraintResults }

  resultsForAllConstraints
    .filter { _.status != ConstraintStatus.Success } //<<This filters failed
    .foreach { result => println(s"${result.constraint}: ${result.message.get}") }
}

This is how you process the verification results. You may want to post what error you are getting in case you have some implementation around this.

devilpreet
  • 749
  • 4
  • 18