0

I have a long-running spark streaming job. The execution time gradually, linearly increasing, and in 60 minutes the processing goes from 40 seconds to 90 seconds.

This increase is happening at an HDFS write statement:

def write_checkpoint(self, df, event_name, no_partition_save=None, no_partition_read=None, partition_by=None, cache=True):

    hdfs_path = self.get_next_checkpoint_path(event_name)  # rotate from the previous output
    if no_partition_save:
        # coalesce instead of repartition can have unwanted behaviour
        # https://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0
        df.repartition(no_partition_save) \
            .write \
            .mode("overwrite") \
            .save(hdfs_path)
    elif partition_by:
        df.write \
            .partitionBy(partition_by) \
            .mode("overwrite") \
            .save(hdfs_path)
    else:
        df \
            .write \
            .mode("overwrite") \
            .save(hdfs_path)
    
    if no_partition_read:
        df_new = self.spark.read.load(hdfs_path).repartition(no_partition_read)
    else:
        df_new = self.spark.read.load(hdfs_path)

    if partition_by:
        df_new = df.repartition(partition_by)
    if cache:
        df_new.cache()
    return df_new


When the application starts, this save operation takes 1-2 seconds.

As time goes on, the task itself remains 2 seconds (first picture, 1 completed stage, that took 2 seconds), but the whole query duration increases drastically (second picture, total time 40 seconds).

Task that remains 2 seconds

Job that increased to 40 seconds

I also put in some logging in python, where I can see the bottleneck at the same operation: enter image description here

What can be the reason for this?

ponthu
  • 311
  • 1
  • 3
  • 14
  • I'm not clear what you're benchmarmking in the log output. Is it possible the lineage of the DF you're saving just keeps getting longer? if you don't have some mechanism to cut it that would explain it. Not clear what this is being called on. – Sean Owen Aug 12 '21 at 14:09
  • Hi @SeanOwen, I first thought about hte lineage too, but that was not the problem. I managed to narrow down and actually solve the problem (however, still do not understand the issue). It seems that the df.write.mode('overwrite).parquet(path) is the one to blame. When I changed this to df.write.mode('overwrite).parquet(unique_path) the process is stable the the execution time is not increasing. However... i do not really understand why – ponthu Aug 16 '21 at 08:37
  • That could've been dangerous to overwrite the same data set if your lineage depended on reads of previous copies of the data – Sean Owen Aug 16 '21 at 23:41
  • yes, indeed - i always had to carefully keep track of the previous path, and rotated accordingly – ponthu Aug 17 '21 at 15:13

0 Answers0