1

A Spark job runs expensive computations in the first stage and I checkpoint the resulting RDD so that they don't have to be repeated in case executors are preempted (it runs on Yarn with preemption). Job is also using a high timeout value for spark.dynamicAllocation.cachedExecutorIdleTimeout so that cached results aren't lost.

However, after that first expensive stage the remaining stages are cheap and fast, and I want to free up resources of the cluster when possible. I've tried to change spark.dynamicAllocation.cachedExecutorIdleTimeout at runtime after the checkpoint but it seems to have no effect.

    if sc.master == "yarn":
        sc.setCheckpointDir(f"/user/{sc.sparkUser()}/checkpoints")
        first_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
        first_rdd.checkpoint()
        first_rdd.count() # eval
        sc.getConf().set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "5min")
        sc.setLocalProperty("spark.dynamicAllocation.cachedExecutorIdleTimeout", "5min")

Is there a way to change the idle timeout on an existing SparkContext?

mazaneicha
  • 8,794
  • 4
  • 33
  • 52
Uwe Brandt
  • 341
  • 2
  • 8

1 Answers1

0

As soon as the sparkContext is created with properties, you can't change it like you did. So your last 2 lines have no effect.

sc.getConf().set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "5min")
sc.setLocalProperty("spark.dynamicAllocation.cachedExecutorIdleTimeout", "5min")

What you should do first is look at difference between persistence and checkpointing, there is a good SO question here

Finally, the best way to resolve your problem, is probably to use DataFrame with persist and unpersist methods.

Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk. This will not un-persist any cached data that is built upon this Dataset.

mazaneicha
  • 8,794
  • 4
  • 33
  • 52
maxime G
  • 1,660
  • 1
  • 10
  • 27