0

I am attempting to use the AWS Big Data Blog article to create a job in AWS Glue Studio and use pydeequ to validate the data.

I was successful in running pydeequ in the job, but when using some of the Check methods, the job kept running even after all the processing had completed successfully.

I checked the execution logs and found that logs like glue.LogPusher (Logging.scala:logInfo(57)): uploading /tmp/spark-event-logs/ to s3://bucket/sparkHistoryLogs/ were being output periodically. It appeared that a process named 'LogPusher' was repeatedly trying to upload event logs to s3.

Does anyone have any idea what causes this event or what to do about it?

Below is the code for a Glue job that can reproduce the above issue.

# Language of job: Python
# Glue version: 3.0
# Deequ version: deequ-2.0.1-spark-3.2

from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from pydeequ.analyzers import (
    AnalysisRunner,
    AnalyzerContext,
    Completeness,
    Maximum,
    MaxLength,
    Minimum,
    MinLength,
    Size,
    UniqueValueRatio,
)
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationResult, VerificationSuite

glue_context = GlueContext(SparkContext.getOrCreate())
spark = glue_context.spark_session

dyf = glue_context.create_dynamic_frame.from_options(
    format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": [f"s3://bucket/test.csv"],
        "recurse": True,
    }
)

df = dyf.toDF()
df.show()

# output:
# +-------+-------+
# |column1|column2|
# +-------+-------+
# |      a|      1|
# |      b|      2|
# |      c|      3|
# +-------+-------+

runner = AnalysisRunner(spark).onData(df)

runner.addAnalyzer(Size())
runner.addAnalyzer(Completeness("column1"))
runner.addAnalyzer(Completeness("column2"))
runner.addAnalyzer(UniqueValueRatio(["column1"]))
runner.addAnalyzer(UniqueValueRatio(["column2"]))
runner.addAnalyzer(MinLength("column2"))
runner.addAnalyzer(MaxLength("column2"))
runner.addAnalyzer(Minimum("column2"))
runner.addAnalyzer(Maximum("column2"))

result = runner.run()
result_df = AnalyzerContext.successMetricsAsDataFrame(spark, result)
result_df.show(truncate=False)

# output:
# +-------+--------+----------------+-----+
# |entity |instance|name            |value|
# +-------+--------+----------------+-----+
# |Column |column2 |UniqueValueRatio|1.0  |
# |Dataset|*       |Size            |3.0  |
# |Column |column1 |Completeness    |1.0  |
# |Column |column1 |UniqueValueRatio|1.0  |
# |Column |column2 |Completeness    |1.0  |
# |Column |column2 |MinLength       |1.0  |
# |Column |column2 |MaxLength       |1.0  |
# +-------+--------+----------------+-----+

check_1 = Check(spark, CheckLevel.Warning, "isComplete").isComplete("column1")
result_1 = VerificationSuite(spark).onData(df).addCheck(check_1).run()
result_df_1 = VerificationResult.checkResultsAsDataFrame(spark, result_1)
result_df_1.show(truncate=False)

# output:
# +----------+-----------+------------+--------------------------------------------------+-----------------+------------------+
# |check     |check_level|check_status|constraint                                        |constraint_status|constraint_message|
# +----------+-----------+------------+--------------------------------------------------+-----------------+------------------+
# |isComplete|Warning    |Success     |CompletenessConstraint(Completeness(column1,None))|Success          |                  |
# +----------+-----------+------------+--------------------------------------------------+-----------------+------------------+

# Up to this point, the job can be completed successfully.

check_2 = Check(spark, CheckLevel.Warning, "hasMinLength").hasMinLength("column1",lambda x: x >= 1)
result_2 = VerificationSuite(spark).onData(df).addCheck(check_2).run()
result_df_2 = VerificationResult.checkResultsAsDataFrame(spark, result_2)
result_df_2.show(truncate=False)

# output:
# +------------+-----------+------------+--------------------------------------------+-----------------+------------------+
# |check       |check_level|check_status|constraint                                  |constraint_status|constraint_message|
# +------------+-----------+------------+--------------------------------------------+-----------------+------------------+
# |hasMinLength|Warning    |Success     |MinLengthConstraint(MinLength(column1,None))|Success          |                  |
# +------------+-----------+------------+--------------------------------------------+-----------------+------------------+

# When the above process is executed, the results are displayed normally, but the job is not finished forever.
trgs
  • 1
  • 1
  • Do you have spark history server enabled on your job? If yes try disabling it. – Prabhakar Reddy Sep 23 '22 at 01:45
  • Thank you for your comment. I have disabled all the checkboxes in the job's advanced properties for logging and metrics settings, including 'Spark UI', but the event still occurs. I was under the impression that log sending to the spark history server would not happen without enabling Spark UI, so I am curious as to why this error occurs. – trgs Sep 27 '22 at 03:50

1 Answers1

0

You need to shut down your spark session:

spark.sparkContext._gateway.shutdown_callback_server()
spark.stop()
Xena
  • 1
  • 1