2

I have to read certain files from S3, so I created a CSV containing path of those files on S3. I am reading created CSV file using below code:

val listofFilesRDD = sparkSession.read.textFile("s3://"+ file)

This is working fine. Then I am trying to read each of those paths and create dataframe like:

listofFilesRDD.foreach(iter => {
  val pathDF = sparkSession.read
    .schema(testSchema)
    .option("headers", true)
    .csv("s3://"+iter)

  pathDF.printSchema()
})

but, the above code gives NullPointerException.

So, How can I fix the above code?

Community
  • 1
  • 1
Uday Shankar Singh
  • 531
  • 1
  • 5
  • 17

3 Answers3

4

You can solve the above problem as below you simple create Array of s3 file paths and iterate over that array and create DF inside that as below

val listofFilesRDD = sparkSession.read.textFile("s3://"+ file)
val listOfPaths = listofFilesRDD.collect()

    listOfPaths.foreach(iter => {
    val pathDF = sparkSession.read
    .schema(testSchema)
    .option("headers", true)
    .csv("s3://"+iter)


pathDF.printSchema()
})    
Sandeep Purohit
  • 3,652
  • 18
  • 22
2

You cannot access a RDD inside a RDD ! Thats the sole rule ! You have to do something else to make your logic work !

You can find more about it here : NullPointerException in Scala Spark, appears to be caused be collection type?

Community
  • 1
  • 1
Shivansh
  • 3,454
  • 23
  • 46
2

If anyone encounter DataFrame problem , can solve this problem.

   def parameterjsonParser(queryDF:DataFrame,spark:SparkSession): Unit ={
        queryDF.show()
        val otherDF=queryDF.collect()
        otherDF.foreach { row =>
          row.toSeq.foreach { col =>
            println(col)
            mainJsonParser(col.toString,spark)
          }
        }

Thank you @Sandeep Purohit

Beyhan Gul
  • 1,191
  • 1
  • 15
  • 25