1

I'm trying to parse a set of XML files using Scala and Spark.I get data for 'n' dataframes from the files.(i.e number of dataframe don't vary, only number of files vary)

I'm parsing a set of XML files and storing the data in ListBuffer[ListBuffer[String]] . Each of the ListBuffer[String] contains data for a dataframe. for eg:

ListBuffer[
    ListBuffer["1|2|3|4","5|6|7|8"],
    ListBuffer["a|b|c|d","e|f|g|h"],
    ListBuffer["q|w|e|r","w|x|y|z"]
]

This would create 3 dataframe:

Dataframe1:
 col1  col2  col3  col4
   1     2     3     4
   5     6     7     8

and similarly other 2 Dataframe.

I cannot directly convert XML to Dataframe, as there a lot of custom handling to be done in the data, before making dataframe.

I'm converting the ListBuffer into Dataframe using the following code:

finalListBuffer.foreach{ data =>

    columns = FunctionToReturnColumnsList()
    val schema = StructType(columns.map(field => StructField(field, StringType, true)))
    val dataRDD: RDD[Row] = sparkSess.sparkContext.parallelize(data.toStream.map(l => Row.fromSeq(l.split("|", -1))))
    val df = sparkSess.createDataFrame(dataRDD, schema)
    ...
}

After this step, some operations are performed on each dataframe, (some operations have inter dataframe dependency, so i cant just process one dataframe, and then write) and finally the dataframes are written using following code:

df.repartition(1).write.mode("Overwrite").option("multiline", "true").option("delimiter", "\u0017").csv(filename)

While doing these steps, I'm getting 2 issues when input file size is high:

1) GC overhead limit exceeded, while creating the dataframe.(Step where dataRDD variable is created)

2) Spark heartbeat timeout error while writing df.

How to solve these issues ?

I was thinking about using ListBuffer[RDD[String]] initially, instead of ListBuffer[ListBuffer[String]]

But there can be as many as 1 million files and each files can have upto 10-20 entries for a df. What I'm doing is, I'm listing all the files, and processing each of them one by one, and appending their result, to a main ListBuffer. So, if I'm using RDD, I'll have to use, union for each files, and this can be expensive. What else could be done ?

ezvine
  • 751
  • 1
  • 8
  • 23
  • So 10 files, each file has >1 DF in it, and each DF has 10-20 entries. Is this right? How many DF in each file? – mikelegg Oct 31 '19 at 14:15
  • No... Totally there are only 'n' DF... For which data comes from 10 files... Each file has 10-20 entry for each DF.... Number of DF's dont vary, with number of files... It will be constant. i.e 'n'.... – ezvine Oct 31 '19 at 14:30
  • OK so what is the value of 'n'? – mikelegg Oct 31 '19 at 14:41
  • It is around 20. It can vary, like from 10-30 – ezvine Oct 31 '19 at 14:43
  • It is not much data to go to the bother of using spark over, but assume you know that. Also its not a lot of data to be out of memory. Have you tried setting the Xmx Xms options to up your heap size? – mikelegg Oct 31 '19 at 14:52
  • In actual case it is not 10 files, it is 1 million, and each df can have upto 10 million rows, at the end of parsing. And no i haven't tried Xmx option in the spark submit command. – ezvine Oct 31 '19 at 15:12
  • I can't help thinking with that many files it will be better to start with a RDD where each element is the representation of the file (the root ListBuffer) then make projections off of that to split into the 'n' dataframes you want – mikelegg Oct 31 '19 at 16:27
  • So are telling to initially read the files as an `RDD[String]` .Then apply a map function to it ? Could you please give a small example if possible. Acually I have a function, that returns `ListBuffer[ListBuffer[String]]`, if passed an XML file. Currently Iam calling this function for each XML file. I can make them return a `ListBuffer[RDD[String]]` instead, if that would help. – ezvine Oct 31 '19 at 21:10

1 Answers1

0
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer

scala> val lbs = ListBuffer(
     |     ListBuffer("1|2|3|4","5|6|7|8"),
     |     ListBuffer("a|b|c|d","e|f|g|h"),
     |     ListBuffer("q|w|e|r","w|x|y|z")
     | )
lbs: scala.collection.mutable.ListBuffer[scala.collection.mutable.ListBuffer[String]] = ListBuffer(ListBuffer(1|2|3|4, 5|6|7|8), ListBuffer(a|b|c|d, e|f|g|h), ListBuffer(q|w|e|r, w|x|y|z))


scala> val schema = StructType(Seq(StructField("c1", StringType, true),StructField("c2", StringType, true),StructField("c3", StringType, true),StructField("c4", StringType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(c1,StringType,true), StructField(c2,StringType,true), StructField(c3,StringType,true), StructField(c4,StringType,true))

scala> var lb_df: ListBuffer[DataFrame] = ListBuffer()
lb_df: scala.collection.mutable.ListBuffer[org.apache.spark.sql.DataFrame] = ListBuffer()

scala> def createDF(lb: ListBuffer[String]) = spark.createDataFrame(spark.sparkContext.parallelize(lb.toSeq).map(_.toString.split("\\|")).map(Row(_: _*)), schema)
createDF: (lb: scala.collection.mutable.ListBuffer[String])org.apache.spark.sql.DataFrame


scala> lbs.foreach(lb => lb_df.append(createDF(lb)))

scala> lb_df.foreach(_.show())
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  1|  2|  3|  4|
|  5|  6|  7|  8|
+---+---+---+---+

+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  a|  b|  c|  d|
|  e|  f|  g|  h|
+---+---+---+---+

+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  q|  w|  e|  r|
|  w|  x|  y|  z|
+---+---+---+---+

I hope this is helpful.

Ajay Ahuja
  • 1,196
  • 11
  • 26
  • Could you please explain, how this will help in tackling the problems I have mentioned. Does writing the createDF function, helps in tackling problem of GC Overhead limit error ? – ezvine Oct 31 '19 at 21:15
  • What spark properties you are setting while running this code ? You may have to increase executor and driver memory and also number of executor cores and overhead memory. You may refer My answer in below for spark properties - https://stackoverflow.com/questions/26168254/how-to-set-amount-of-spark-executors/56720485#56720485 – Ajay Ahuja Nov 01 '19 at 13:33
  • These are the properties set : `properties spark.submit.deployMode=cluster,spark.yarn.maxAppAttempts=1,spark.executor.instances=1,spark.executor.memoryOverhead=17408,spark.executor.memory=148G,spark.yarn.driver.memoryOverhead=17408, spark.driver.memory=148G,spark.executor.cores=32,spark.driver.cores=32,spark.default.parallelism=64` I'm using only one executor, so that driver memory is max, because, the whole listbuffer will be stored in driver memory, as per my understanding. – ezvine Nov 03 '19 at 09:07