2

I have created a fixed width file import parser in spark and performed a few execution test on various datasets. It works fine up to 1000 columns, but, as the number of columns and fixed width length increases, Spark job performance decreases rapidly. It takes a lot of time to execute on 20k columns and fixed width length more than 100 thousand.

What are the possible reasons for this? How can I improve the performance?

One of the similar issues I found:

http://apache-spark-developers-list.1001551.n3.nabble.com/Performance-Spark-DataFrame-is-slow-with-wide-data-Polynomial-complexity-on-the-number-of-columns-is-td24635.html

zero323
  • 322,348
  • 103
  • 959
  • 935
katty
  • 167
  • 1
  • 2
  • 11
  • Could you share the detailed data about the experiments a) What is the timing b) Data types c) size of the dataset etc. – Avishek Bhattacharya Sep 15 '18 at 09:09
  • Size of dataset fixed width-record length more than 120000 ., Dataset size 200 gb , 20k columns to be extracted based on width provided in json schema file. Same code tried on 1000 columns file with length 10000 it worked fine but as number of columns increases with record length execution time increases rapidly . I passed 16gb executors memory 16gb driver memory 30 executors running on yarn , Jon failed over issues container failed with status 143 – katty Sep 15 '18 at 09:39
  • Do you really need to load all of the 20k columns to compute? In a practical project, We rarely need all of the columns to compute – tauitdnmd Sep 15 '18 at 09:52
  • The End user is not sure about how many columns they need. there is large base of users from different departments who will use data. I tried with just selecting few columns and wrting to parquet still it takes same time as all columns. May be code takes same time to scan the records in RDD , it scans over the large string and then substring the columns based on width provided in JSON file. As of now I need all the columns in parquet. Please suggest any performance tips may be while reading in RDD or partitioning tips. the job runs for 4 hours and timeout with container failures status 143 – katty Sep 15 '18 at 10:06

1 Answers1

0

If you are having more number of columns, it is better to read/convert the record as an array and use the slice function to map it to individual columns. Using substring to get individual columns will not be that efficient.

EDIT 1:

I used a Array[String] as an example by attaching it to a case class Record() in scala. You can extend it to hdfs textfiles

scala> case class Record(a1:String,a2:Int,a3:java.time.LocalDate)
defined class Record

scala>  val x = sc.parallelize(Array("abcd1232018-01-01","defg4562018-02-01"))
x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> val y = x.map( a => Record( a.slice(0,4), a.slice(4,4+3).toInt,java.time.LocalDate.parse(a.slice(7,7+10))))
y: org.apache.spark.rdd.RDD[Record] = MapPartitionsRDD[4] at map at <console>:27

scala> y.collect()
res3: Array[Record] = Array(Record(abcd,123,2018-01-01), Record(defg,456,2018-02-01))

scala>
stack0114106
  • 8,534
  • 3
  • 13
  • 38
  • currently I am reading the record as RDD string and then use substring function to split columns . can you please show some sample code may be in scala or java to read as record as array – katty Sep 20 '18 at 10:17
  • I edited the answer to show an example in scala. Please tailor it to your textfile – stack0114106 Sep 20 '18 at 11:04