1

I'm using pydeequ with Spark 3.0.1 to perform some constraint checks on data.

As for testing with the VerificationSuite, after calling VerificationResult.checkResultsAsDataFrame(spark, result), it seems that the callback server which gets started by pydeequ does not get terminated automatically.

If I run code containing pydeequ on an EMR cluster for example, the port 25334 seems to stay open after the spark application closes, unless I explicitly create a JavaGateway with the spark session, and call the close() method.

from pydeequ.verification import *

from pyspark.sql import SparkSession, Row

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())


df = spark.sparkContext.parallelize([
            Row(a="foo", b=1, c=5),
            Row(a="bar", b=2, c=6),
            Row(a="baz", b=None, c=None)]).toDF()

from py4j.java_gateway import JavaGateway

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x < 3) \
        .hasMin("b", lambda x: x == 0) \
        .isComplete("c")  \
        .isUnique("a")  \
        .isContainedIn("a", ["foo", "bar", "baz"]) \
        .isNonNegative("b")) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

a = JavaGateway(spark.sparkContext._gateway)
a.close()

If I don't implement the last 2 lines of code, the callback server stays open on the port.

Is there a way around this?

dataviews
  • 2,466
  • 7
  • 31
  • 64

1 Answers1

0

PyDeequ github says to use these to shutdown Spark session:

spark.sparkContext._gateway.shutdown_callback_server()
spark.stop()
LRon
  • 1