0

I have a text file which contains a file List. Currently, I am iterating through my file list sequentially

My File list looks like below,

D:\Users\bramasam\Documents\sampleFile1.txt
D:\Users\Documents\sampleFile2.txt

and execute the below code for each file,

val df = spark.read
   .format("org.apache.spark.csv")
   .option("header", false)
   .option("inferSchema", false)
   .option("delimiter", "|")
   .schema(StructType(fields)) //calling a method to find schema
   .csv(fileName_fetched_foreach)
   .toDF(old_column_string: _*)

 df.write.format("orc").save(target_file_location)

What I am trying to do is to parallely execute the above code for each file instead of a sequence because there is no dependency between files. So, I am trying something like below but am facing errors,

  //read the file which has the file list
    spark.read.textFile("D:\\Users\\Documents\\ORC\\fileList.txt").foreach { line =>
      val tempTableName = line.substring(line.lastIndexOf("\\"),line.lastIndexOf("."))
      val df = spark.read
        .format("org.apache.spark.csv")
        .option("header", false)
        .option("inferSchema", false)
        .option("delimiter", "|")
        .schema(StructType(fields))
        .csv(line)
        .toDF(old_column_string: _*)
        .registerTempTable(tempTableName)

      val result = spark.sql(s"select $new_column_string from $tempTableName") //reordering column order on how it has to be stored

      //Note: writing to ORC needs Hive support. So, make sure the systax is right
      result.write.format("orc").save("D:\\Users\\bramasam\\Documents\\SCB\\ORCFile")
    }
  }

I am facing the below error,

java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:135)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:133)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:689)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:645)
    at ConvertToOrc$$anonfun$main$1.apply(ConvertToOrc.scala:25)
    at ConvertToOrc$$anonfun$main$1.apply(ConvertToOrc.scala:23)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
Gladiator
  • 354
  • 3
  • 19
  • Can you add some detail on what you are trying to do and for what purpose? – koiralo May 24 '18 at 10:55
  • @ShankarKoirala, I am trying to process a set of files (in this case, converting files to ORC by applying schema). But, in my current process, I am reading files sequentially by running a for loop iterating through the file names. Since the process is independent, I wanted to think of a way to achieve this in parallel by using spark's higher order functions. – Gladiator May 25 '18 at 03:59

2 Answers2

0

What you should do is put files in a dir, then ask spark to read the whole directory, if needed append a column to each file with file name

spark.read.textFile("D:\\Users\\Documents\\ORC\\*")

    • is to read all files
Dmitry
  • 220
  • 1
  • 11
0

Please see @samthebest answer.

In your case you should pass the whole directory such as:

spark.read.textFile("D:\\Users\\Documents\\ORC")

In case you want to read dirs recursively refer to this answer:

spark.read.textFile("D:\\Users\\Documents\\ORC\\*\\*")
dbustosp
  • 4,208
  • 25
  • 46