You can retrieve RddInfo
array from SparkContext, and interrogate its elements for the partition counts of an RDD you're interested in. If some of the partitions were evicted/didnt fit into executor storage, the number numCachedPartitions
will be less than total number of RDD's partitions numPartitions
.
scala> val rdd = sc.textFile("file:///etc/spark/conf/spark-defaults.conf").repartition(10)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at repartition at <console>:27
scala> rdd.persist().count()
res14: Long = 34
scala> val rddStorage = rdd.context.getRDDStorageInfo(0)
rddStorage: org.apache.spark.storage.RDDInfo = RDD "MapPartitionsRDD" (9) StorageLevel: StorageLevel(memory, deserialized, 1 replicas); CachedPartitions: 10; TotalPartitions: 10; MemorySize: 5.1 KB; DiskSize: 0.0 B
scala> val fullyCached = (rddStorage.numCachedPartitions == rddStorage.numPartitions)
fullyCached: Boolean = true
Zero in the above, ...getRDDStorageInfo(0)
, is used for illustration purposes only. In reality, instead of simply using 0
, you'd need to get the id
of an RDD you're interested in (see RDD.id
), and then iterate through the RDDInfo[]
array to find the element with rddInfo.id = id
. You can probably also use rddInfo.name
to do the same thing if you give the RDD a name.
Finally, you could just detect if any RDD has eviction with something like this:
sparkSession
.sparkContext.getRDDStorageInfo.filter(_.isCached)
.find(rdd => rdd.numCachedPartitions < rdd.numPartitions)
.foreach(rdd =>
throw new IllegalArgumentException(s"RDD is being evicted, please configure cluster with more memory. " +
s"numCachedPartitions = ${rdd.numCachedPartitions}, " +
s"numPartitions = ${rdd.numPartitions}, " +
s"name = ${rdd.name}, " +
s"id = ${rdd.id}, " +
s"memSize = ${rdd.memSize}, " +
s"diskSize = ${rdd.diskSize}, " +
s"externalBlockStoreSize = ${rdd.externalBlockStoreSize}"
))