I am attempting to use the AWS Big Data Blog article to create a job in AWS Glue Studio and use pydeequ to validate the data.
I was successful in running pydeequ in the job, but when using some of the Check methods, the job kept running even after all the processing had completed successfully.
I checked the execution logs and found that logs like glue.LogPusher (Logging.scala:logInfo(57)): uploading /tmp/spark-event-logs/ to s3://bucket/sparkHistoryLogs/
were being output periodically.
It appeared that a process named 'LogPusher' was repeatedly trying to upload event logs to s3.
Does anyone have any idea what causes this event or what to do about it?
Below is the code for a Glue job that can reproduce the above issue.
# Language of job: Python
# Glue version: 3.0
# Deequ version: deequ-2.0.1-spark-3.2
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from pydeequ.analyzers import (
AnalysisRunner,
AnalyzerContext,
Completeness,
Maximum,
MaxLength,
Minimum,
MinLength,
Size,
UniqueValueRatio,
)
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationResult, VerificationSuite
glue_context = GlueContext(SparkContext.getOrCreate())
spark = glue_context.spark_session
dyf = glue_context.create_dynamic_frame.from_options(
format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
connection_type="s3",
format="csv",
connection_options={
"paths": [f"s3://bucket/test.csv"],
"recurse": True,
}
)
df = dyf.toDF()
df.show()
# output:
# +-------+-------+
# |column1|column2|
# +-------+-------+
# | a| 1|
# | b| 2|
# | c| 3|
# +-------+-------+
runner = AnalysisRunner(spark).onData(df)
runner.addAnalyzer(Size())
runner.addAnalyzer(Completeness("column1"))
runner.addAnalyzer(Completeness("column2"))
runner.addAnalyzer(UniqueValueRatio(["column1"]))
runner.addAnalyzer(UniqueValueRatio(["column2"]))
runner.addAnalyzer(MinLength("column2"))
runner.addAnalyzer(MaxLength("column2"))
runner.addAnalyzer(Minimum("column2"))
runner.addAnalyzer(Maximum("column2"))
result = runner.run()
result_df = AnalyzerContext.successMetricsAsDataFrame(spark, result)
result_df.show(truncate=False)
# output:
# +-------+--------+----------------+-----+
# |entity |instance|name |value|
# +-------+--------+----------------+-----+
# |Column |column2 |UniqueValueRatio|1.0 |
# |Dataset|* |Size |3.0 |
# |Column |column1 |Completeness |1.0 |
# |Column |column1 |UniqueValueRatio|1.0 |
# |Column |column2 |Completeness |1.0 |
# |Column |column2 |MinLength |1.0 |
# |Column |column2 |MaxLength |1.0 |
# +-------+--------+----------------+-----+
check_1 = Check(spark, CheckLevel.Warning, "isComplete").isComplete("column1")
result_1 = VerificationSuite(spark).onData(df).addCheck(check_1).run()
result_df_1 = VerificationResult.checkResultsAsDataFrame(spark, result_1)
result_df_1.show(truncate=False)
# output:
# +----------+-----------+------------+--------------------------------------------------+-----------------+------------------+
# |check |check_level|check_status|constraint |constraint_status|constraint_message|
# +----------+-----------+------------+--------------------------------------------------+-----------------+------------------+
# |isComplete|Warning |Success |CompletenessConstraint(Completeness(column1,None))|Success | |
# +----------+-----------+------------+--------------------------------------------------+-----------------+------------------+
# Up to this point, the job can be completed successfully.
check_2 = Check(spark, CheckLevel.Warning, "hasMinLength").hasMinLength("column1",lambda x: x >= 1)
result_2 = VerificationSuite(spark).onData(df).addCheck(check_2).run()
result_df_2 = VerificationResult.checkResultsAsDataFrame(spark, result_2)
result_df_2.show(truncate=False)
# output:
# +------------+-----------+------------+--------------------------------------------+-----------------+------------------+
# |check |check_level|check_status|constraint |constraint_status|constraint_message|
# +------------+-----------+------------+--------------------------------------------+-----------------+------------------+
# |hasMinLength|Warning |Success |MinLengthConstraint(MinLength(column1,None))|Success | |
# +------------+-----------+------------+--------------------------------------------+-----------------+------------------+
# When the above process is executed, the results are displayed normally, but the job is not finished forever.