I have a long-running spark streaming job. The execution time gradually, linearly increasing, and in 60 minutes the processing goes from 40 seconds to 90 seconds.
This increase is happening at an HDFS write statement:
def write_checkpoint(self, df, event_name, no_partition_save=None, no_partition_read=None, partition_by=None, cache=True):
hdfs_path = self.get_next_checkpoint_path(event_name) # rotate from the previous output
if no_partition_save:
# coalesce instead of repartition can have unwanted behaviour
# https://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0
df.repartition(no_partition_save) \
.write \
.mode("overwrite") \
.save(hdfs_path)
elif partition_by:
df.write \
.partitionBy(partition_by) \
.mode("overwrite") \
.save(hdfs_path)
else:
df \
.write \
.mode("overwrite") \
.save(hdfs_path)
if no_partition_read:
df_new = self.spark.read.load(hdfs_path).repartition(no_partition_read)
else:
df_new = self.spark.read.load(hdfs_path)
if partition_by:
df_new = df.repartition(partition_by)
if cache:
df_new.cache()
return df_new
When the application starts, this save operation takes 1-2 seconds.
As time goes on, the task itself remains 2 seconds (first picture, 1 completed stage, that took 2 seconds), but the whole query duration increases drastically (second picture, total time 40 seconds).
I also put in some logging in python, where I can see the bottleneck at the same operation:
What can be the reason for this?