I'm trying to parse a set of XML files using Scala and Spark.I get data for 'n' dataframes from the files.(i.e number of dataframe don't vary, only number of files vary)
I'm parsing a set of XML files and storing the data in ListBuffer[ListBuffer[String]]
. Each of the ListBuffer[String]
contains data for a dataframe.
for eg:
ListBuffer[
ListBuffer["1|2|3|4","5|6|7|8"],
ListBuffer["a|b|c|d","e|f|g|h"],
ListBuffer["q|w|e|r","w|x|y|z"]
]
This would create 3 dataframe:
Dataframe1:
col1 col2 col3 col4
1 2 3 4
5 6 7 8
and similarly other 2 Dataframe.
I cannot directly convert XML to Dataframe, as there a lot of custom handling to be done in the data, before making dataframe.
I'm converting the ListBuffer into Dataframe using the following code:
finalListBuffer.foreach{ data =>
columns = FunctionToReturnColumnsList()
val schema = StructType(columns.map(field => StructField(field, StringType, true)))
val dataRDD: RDD[Row] = sparkSess.sparkContext.parallelize(data.toStream.map(l => Row.fromSeq(l.split("|", -1))))
val df = sparkSess.createDataFrame(dataRDD, schema)
...
}
After this step, some operations are performed on each dataframe, (some operations have inter dataframe dependency, so i cant just process one dataframe, and then write) and finally the dataframes are written using following code:
df.repartition(1).write.mode("Overwrite").option("multiline", "true").option("delimiter", "\u0017").csv(filename)
While doing these steps, I'm getting 2 issues when input file size is high:
1) GC overhead limit exceeded, while creating the dataframe.(Step where dataRDD
variable is created)
2) Spark heartbeat timeout error while writing df.
How to solve these issues ?
I was thinking about using ListBuffer[RDD[String]]
initially, instead of ListBuffer[ListBuffer[String]]
But there can be as many as 1 million files and each files can have upto 10-20 entries for a df. What I'm doing is, I'm listing all the files, and processing each of them one by one, and appending their result, to a main ListBuffer. So, if I'm using RDD, I'll have to use, union for each files, and this can be expensive. What else could be done ?