I have a text file which contains a file List. Currently, I am iterating through my file list sequentially
My File list looks like below,
D:\Users\bramasam\Documents\sampleFile1.txt
D:\Users\Documents\sampleFile2.txt
and execute the below code for each file,
val df = spark.read
.format("org.apache.spark.csv")
.option("header", false)
.option("inferSchema", false)
.option("delimiter", "|")
.schema(StructType(fields)) //calling a method to find schema
.csv(fileName_fetched_foreach)
.toDF(old_column_string: _*)
df.write.format("orc").save(target_file_location)
What I am trying to do is to parallely execute the above code for each file instead of a sequence because there is no dependency between files. So, I am trying something like below but am facing errors,
//read the file which has the file list
spark.read.textFile("D:\\Users\\Documents\\ORC\\fileList.txt").foreach { line =>
val tempTableName = line.substring(line.lastIndexOf("\\"),line.lastIndexOf("."))
val df = spark.read
.format("org.apache.spark.csv")
.option("header", false)
.option("inferSchema", false)
.option("delimiter", "|")
.schema(StructType(fields))
.csv(line)
.toDF(old_column_string: _*)
.registerTempTable(tempTableName)
val result = spark.sql(s"select $new_column_string from $tempTableName") //reordering column order on how it has to be stored
//Note: writing to ORC needs Hive support. So, make sure the systax is right
result.write.format("orc").save("D:\\Users\\bramasam\\Documents\\SCB\\ORCFile")
}
}
I am facing the below error,
java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:135)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:133)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:689)
at org.apache.spark.sql.SparkSession.read(SparkSession.scala:645)
at ConvertToOrc$$anonfun$main$1.apply(ConvertToOrc.scala:25)
at ConvertToOrc$$anonfun$main$1.apply(ConvertToOrc.scala:23)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)