I'm using pydeequ with Spark 3.0.1 to perform some constraint checks on data.
As for testing with the VerificationSuite
, after calling VerificationResult.checkResultsAsDataFrame(spark, result)
, it seems that the callback server which gets started by pydeequ does not get terminated automatically.
If I run code containing pydeequ on an EMR cluster for example, the port 25334
seems to stay open after the spark application closes, unless I explicitly create a JavaGateway
with the spark session, and call the close()
method.
from pydeequ.verification import *
from pyspark.sql import SparkSession, Row
spark = (SparkSession
.builder
.config("spark.jars.packages", pydeequ.deequ_maven_coord)
.config("spark.jars.excludes", pydeequ.f2j_maven_coord)
.getOrCreate())
df = spark.sparkContext.parallelize([
Row(a="foo", b=1, c=5),
Row(a="bar", b=2, c=6),
Row(a="baz", b=None, c=None)]).toDF()
from py4j.java_gateway import JavaGateway
check = Check(spark, CheckLevel.Warning, "Review Check")
checkResult = VerificationSuite(spark) \
.onData(df) \
.addCheck(
check.hasSize(lambda x: x < 3) \
.hasMin("b", lambda x: x == 0) \
.isComplete("c") \
.isUnique("a") \
.isContainedIn("a", ["foo", "bar", "baz"]) \
.isNonNegative("b")) \
.run()
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)
a = JavaGateway(spark.sparkContext._gateway)
a.close()
If I don't implement the last 2 lines of code, the callback server stays open on the port.
Is there a way around this?