If helpful for anyone, here's a full example showing how to generate suggested data quality constraints and then check all of them.
Note, this example uses PyDeequ, which is the Python implementation of Scala's Deequ. This question specifically mentioned Deequ, but PyDeequ has a very similar suite of APIs. I built this solution partially off @mlin's solution.
First, let's create a string that is a concatenation of all the suggested constraints:
from pydeequ.suggestions import *
from pyspark.sql import SparkSession
# Reading data
df = \
(spark
.read
.load("path/to/data/here"))
# Getting suggested data quality constraint rules
suggested_constraints = \
(ConstraintSuggestionRunner(spark)
.onData(df)
.addConstraintRule(CompleteIfCompleteRule())
.addConstraintRule(NonNegativeNumbersRule())
.addConstraintRule(RetainCompletenessRule())
.addConstraintRule(RetainTypeRule())
.addConstraintRule(UniqueIfApproximatelyUniqueRule())
.run())
# Printing suggested constraints in JSON format
print(json.dumps(suggested_constraints, indent=2))
# Printing suggested constraints in more readable and understandable format
for suggestion in suggested_constraints['constraint_suggestions']:
print("-----")
print(f"Suggested constraint for \'{suggestion['column_name']}\': {suggestion['description']}")
print(f"The description for this rule is: \'{suggestion['rule_description']}\'")
print(f"The corresponding Python code is: `{suggestion['code_for_constraint']}`")
# Creating empty string to concatenate against
pydeequ_validation_string = ""
# Building string from suggestions
for suggestion in suggested_constraints['constraint_suggestions']:
pydeequ_validation_string = pydeequ_validation_string + suggestion["code_for_constraint"]
# Printing string validation string
# If desired, edit this string to control what data quality validations are performed
print(pydeequ_validation_string)
At this point, our pydeequ_validation_string
might look like:
.isComplete("column_1").isComplete("column_2").isComplete("tcolumn_3").isNonNegative("column_1")
Now, let's take our pydeequ_validation_string
and use it to check all these constraints at once. Here's a function to do this. Note, I'm first concatenating our string with "check"
, and then using Python's eval
to evaluate this string as code.
import logging
from pydeequ.checks import Check, CheckLevel, ConstrainableDataTypes
from pydeequ.verification import VerificationResult, VerificationSuite
from pyspark.sql import functions as F, SparkSession
logger = logging.getLogger(__name__)
def perform_checks(df, pydeequ_validation_string):
"""Perform and log data quality checks."""
# Initializing
check = \
Check(spark_session=spark,
level=CheckLevel.Warning,
description="Data Quality Check")
# Building validation string of constraints to check
pydeequ_validation_string_to_check = "check" + pydeequ_validation_string
# Checking constraints
checked_constraints = \
(VerificationSuite(spark)
.onData(df)
.addCheck(eval(pydeequ_validation_string_to_check))
.run())
# Returning results as DataFrame
df_checked_constraints = \
(VerificationResult
.checkResultsAsDataFrame(spark, checked_constraints))
logger.info(
df_checked_constraints.show(n=df_checked_constraints.count(),
truncate=False)
)
# Filtering for any failed data quality constraints
df_checked_constraints_failures = \
(df_checked_constraints
.filter(F.col("constraint_status") == "Failure"))
# If any data quality check fails, raise exception
if df_checked_constraints_failures.count() > 0:
logger.info(
df_checked_constraints_failures.show(n=df_checked_constraints_failures.count(),
truncate=False)