0

I want to get all the filepaths that a spark plan refers to, so that I can compare them with a predefined set of paths.

Doing a simple string search on the Spark Plan can run into issues as I see that the paths are getting truncated in the plan. I have attempted to get a non-truncated plan, but was unsuccessful in doing so (configuring spark.sql.maxMetadataStringLength and spark.debug.maxToStringFields, I also tried using .treeString(verbose=true) ). I found that df.explain(‘formatted’) was giving a non truncated output. But I would like to get it from the org.apache.spark.sql.execution.SparkPlan.

I’ve also tried extracting the FileScans from the Spark Plan. This article helped with the following code:

def full_file_meta(f: FileSourceScanExec) = {
    val metadataEntries = f.metadata.toSeq.sorted.flatMap {
      case (key, value) if Set(
          "Location", "PartitionCount",
          "PartitionFilters", "PushedFilters"
      ).contains(key) =>
        Some(key + ": " + value.toString)
      case other => None
    }

    val metadataStr = metadataEntries.mkString("[\n  ", ",\n  ", "\n]")
    s"${f.nodeNamePrefix}${f.nodeName}$metadataStr"

}

val ep = data.queryExecution.executedPlan

print(ep.flatMap {
    case f: FileSourceScanExec => full_file_meta(f)::Nil
    case other => Nil
}.mkString(",\n"))

However, this help in case of simple Plans (queries such as “select * from path”). It does not work when joins are in the picture (queries such as “select * from path1 join path2”). For some reason in the latter case, there is no FileSourceScanExec object present as a key for the FlatMap. In fact there is only one key which is org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec, and it has an empty list of children.

UPDATE

I have found a semi-solution. The idea is to parse the tree until I reach a leaf of class: FileSourceScanExec, and collect all such leaves. The reason I call it a semi-solution is that I am building it on a case by case basis. I can't default to expanding all non-leaf nodes to their children as some nodes have an empty children list and their plans need to refrenced from one of their member objects.

import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.SparkSession
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan, UnionExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
class PhysicalPlanListener extends SparkListener {
def full_file_meta(f: FileSourceScanExec) = {
    val metadataEntries = f.metadata.toSeq.sorted.flatMap {
      case (key, value) if key.equals("Location")=>
        Some(key + ": " + value.toString)
      case other => None
    }
    val metadataStr = metadataEntries.mkString("[\n  ", ",\n  ", "\n]")
    s"${f.nodeNamePrefix}${f.nodeName}$metadataStr"
}
  def getAllPaths(plan: SparkPlan):String= {
    
     plan match{
       case scan:FileSourceScanExec=>
            return full_file_meta(scan)+"\n"
      
       case scan:AdaptiveSparkPlanExec=>
              return getAllPaths(scan.executedPlan)
       
       case scan:BroadcastQueryStageExec=>
              return getAllPaths(scan.plan)
       
       case scan:ShuffleQueryStageExec=>
              return getAllPaths(scan.plan)
      
       case other=>
      var return_str=""
              for (child<-other.children){
                return_str=return_str+getAllPaths(child)
              }
      return return_str
    }
  }
  override def onJobStart(jobStart : SparkListenerJobStart)  
  {
    
    val jobId = jobStart.jobId
    val executionIdStr = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    if (executionIdStr != null) {
      val queryExecution = SQLExecution.getQueryExecution(executionIdStr.toLong)
      val plan=getAllPaths(queryExecution.executedPlan)
      val unauthorized_lakepaths=Array("path_to_block")
    for (lakepath<-unauthorized_lakepaths)
      {
        if(plan contains lakepath)
          {
            spark.sparkContext.cancelJob(jobId,"Access to Lake "+lakepath+" is DISABLED")
          }
      }
    }
    }
}
sator_aa
  • 1
  • 1

0 Answers0