I have a requirement to run Data Quality Test So I am using Amazon Deequ for this. I am able to find the Data Quality Success/Failure Status using below code, but next I want to get all the rows which was failed in check and Store into another DataFrame/Hive Table. Please help on that how can I get it. Also can we perform Amazon Deequ on multiple Dataset at same time? Below is the Running Code and need help to get the code for storing bad failed records.
import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
import com.amazon.deequ.constraints.ConstraintStatus
object Test extends App {
val spark = SparkSession.builder()
.master("local[*]")
.appName("amazon-deequ-test")
.getOrCreate();
val data = Seq((1, "Thingy A", "awesome thing.", "high", 0),
(2, "Thingy B", "available at http://thingb.com", null, 0),
(3, null, null, "low", 5),
(4, "Thingy D", "checkout https://thingd.ca", "low", -10),
(5, "Thingy E", null, "high", 12))
val cols = Seq("id", "productName", "description", "priority", "numViews")
val data = spark.createDataframe(data).toDF(cols: _*)
data.show(false)
val verificationResult: verificationResult = VerificationSuite() {
VerificationSuite()
.onData(data)
.addCheck(
Check(CheckLevel.Error, "integrity checks")
// we expect 5 records
.hasSize(_ == 5)
// 'id' should never be NULL
.isComplete("id")
// 'id' should not contain duplicates
.isUnique("id")
// 'productName' should never be NULL
.isComplete("productName")
// 'priority' should only contain the values "high" and "low"
.isContainedIn("priority", Array("high", "low"))
// 'numViews' should not contain negative values
.isNonNegative("numViews"))
.addCheck(
Check(CheckLevel.Warning, "distribution checks")
// at least half of the 'description's should contain a url
.containsURL("description", _ >= 0.5)
// half of the items should have less than 10 'numViews'
.hasApproxQuantile("numViews", 0.5, _ <= 10))
.run()
}
val resultDataFrame = checkResultAsDataFrame(spark, verificationResult).show(false)
}