0

I'm running a spark job executed in the spark-shell that's been executing for close to 80+ hours, and there has to be some way to spread it out. Here are the configurations I submitted when starting the job and the code that is being run.

spark-shell --master \
yarn \
--num-executors 100 \
--name cde_test \
--executor-cores 4 \
--executor-memory 5g \
--driver-cores 2 \
--driver-memory 3g \
--jars ./spark_jars/spark-xml_2.11-0.8.0.jar \
--verbose

Here's a pic of the executors info on the resource manager UI tool: spark_ui_executor_screenshot

I want to parse through XML files with spark-xml and extract certain fields and save to CSV. I thought beefing up the number of executors would speed up the job since these are small and quick low memory tasks, but not sure if I did it right or whether the way the code is written will prevent parallel execution. Code below, and any and all help appreciated.

import org.apache.hadoop.fs._
import collection.mutable._
import spark.implicits._
import java.io.File
import java.util.regex.Pattern
import org.apache.spark.sql._
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.util.control.Exception._
import org.apache.commons.io.FilenameUtils  
import org.apache.commons.lang.StringEscapeUtils
import org.apache.hadoop.conf.Configuration

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}

object HdfsUtils {
  def pathExists(path: String, sc: SparkContext): Boolean = {
    val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(conf)
    fs.exists(new Path(path))
  }

  def getFullPath(path:String, sc: SparkContext): String = {
    val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(conf)
    fs.getFileStatus(new Path(path)).getPath().toString
  }

  def getAllFiles(path:String, sc: SparkContext): Seq[String] = {
    val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(conf)
    val files = fs.listStatus(new Path(path))
    files.map(_.getPath().toString)
  }
}

//Four different mapping functions
val path_list = Seq("path_1_for_first_directory",
"path_2_for_second_directory")

path_list.foreach ( path => {
val hdfs_directory = HdfsUtils.getAllFiles(path, sc)

hdfs_directory.foreach( intermediate_folder => {
val intermediate_folders = HdfsUtils.getAllFiles(intermediate_folder, sc)

intermediate_folders.foreach( final_folder => {
val hdfs_files = HdfsUtils.getAllFiles(final_folder, sc)

hdfs_files.foreach( xml_file => {

val date = raw"(\d{4})-(\d{2})-(\d{2})".r
val directory_date = date.findFirstIn(xml_file).
getOrElse(xml_file)

//Ignore meta files
if (xml_file.contains("META") || xml_file.contains("meta")){


} else if (xml_file.contains(".xml") || xml_file.contains(".XML")){


try{

val xml_df = spark.
read.
format("xml").
option("rowTag","root").
option("treatEmptyValuesAsNulls","true").
option("nullValue", null).
option("emptyValue", null).
load(xml_file)

val info_df = xml_df.
select(
  substring($"column_1",0,8).alias("date"),
  substring($"column_2",9,20).alias("time"),
  $"column_3".alias("first_name").cast("string"),
  $"column_4".alias("last_name").cast("string"),
  $"column_5".alias("birthday").cast("string"),
  $"column_6".alias("street").cast("string"),
  $"column_7".alias("city").cast("string"),
  $"column_8".alias("state").cast("string"),
  $"column_9".alias("zip_code").cast("string"),
  $"column_10".alias("country").cast("string")
)

val outputfile = "/path_to_output/"
var filename = s"$directory_date"
var outputFileName = outputfile + filename 


info_df.write
    .format("csv")
    .option("header", "false")
    .option("sep","|")
    .mode("append")
    .save(outputFileName)

    } 

    catch{ 
      case _: RuntimeException => {}
      case _: Exception => {}
    }
}
})
})
})
})
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
CheerSal
  • 15
  • 3
  • 2
    It is my understanding that you are not reading different XML files with different Spark executors in parallel. Rather, you are reading all files sequentially in the driver program, creating a new DataFrame for each one, which is a very expensive process, especially given that you are reading from HDFS. You need to make a DataFrame of all file names, filter it, then use `foreach()`. Use something else to load the XML content locally in each worker, not `spark-xml`, since the latter creates dataframes. – Hristo Iliev Mar 01 '20 at 22:17
  • Okay, do you have any suggestions for a replacement for spark-xml? Note that my cluster doesn’t have PySpark enabled so Python libraries aren’t an option. How would you go about doing what you listed out—not sure how to approach this if you think spark-xml isn’t a good choice here – CheerSal Mar 01 '20 at 23:55
  • Scala has first-class [support for XML](https://dzone.com/articles/working-with-xml-in-scala). It is extensive, but not the most performant one out there, which is why Spark XML comes with its own XML parser built on top of the Java StAX API. If your files are small, I'd say go with the Scala XML API. Otherwise, look at what's available in the `javax.xml` namespace. Or think of separating the relevant XML files so that you can read them all in bulk with Spark XML as shown in ELinda's answer. – Hristo Iliev Mar 02 '20 at 09:24

1 Answers1

0

You're using foreach on a Seq, which is sequential (as Hristo Iliev alluded to). If you have many files which are mostly somewhat small, then processing one file at a time might be why it's slow.

  • You can use wildcards instead of iterating through HDFS files. You can read multiple files into a larger DataFrame at once; for example, here, we process a whole month at a time:
spark.read.format("xml").load("/somepath/*/YYYY-MM-*.xml")

Note the /*/ to represent "intermediate directory". What's better for you might depend on if there's a more specific pattern for these intermediate directories, or if they also depend on the date.

ELinda
  • 2,658
  • 1
  • 10
  • 9