1

I have 620 csv files and they have different columns and data. For example:

//file1.csv
word, count1
w1, 100
w2, 200

//file2.csv
word, count2
w1, 12
w5, 22

//Similarly fileN.csv
word, countN
w7, 17
w2, 28

My expected output

//result.csv
word, count1, count2, countN
w1,    100,     12,    null
w2,    200 ,   null,    28  
w5,    null,    22,    null
w7,    null,   null,    17

I was able to do it in Scala for two files like this where df1 is file1.csv and df2 is file2.csv:

df1.join(df2, Seq("word"),"fullouter").show()

I need any solution, either in Scala or Linux command to do this.

vindev
  • 2,240
  • 2
  • 13
  • 20
Abu Shoeb
  • 4,747
  • 2
  • 40
  • 45

1 Answers1

2

Using Spark you can read all your files as a Dataframe and store it in a List[Dataframe]. After that you can apply reduce on that List for joining all the dataframes together. Following is the code using three Dataframes but you can extend and use same for all your files.

//create all three dummy DFs
val df1 = sc.parallelize(Seq(("w1", 100), ("w2", 200))).toDF("word", "count1")
val df2 = sc.parallelize(Seq(("w1", 12), ("w5", 22))).toDF("word", "count2")
val df3 = sc.parallelize(Seq(("w7", 17), ("w2", 28))).toDF("word", "count3")

//store all DFs in a list
val dfList: List[DataFrame] = List(df1, df2, df3)

//apply reduce function to join them together
val joinedDF = dfList.reduce((a, b) => a.join(b, Seq("word"), "fullouter"))

joinedDF.show()
//output
//+----+------+------+------+
//|word|count1|count2|count3|
//+----+------+------+------+
//|  w1|   100|    12|  null|
//|  w2|   200|  null|    28|
//|  w5|  null|    22|  null|
//|  w7|  null|  null|    17|
//+----+------+------+------+

//To write to CSV file
joinedDF.write
  .option("header", "true")
  .csv("PATH_OF_CSV")

This is how you can read all your files and store it in a List

//declare a ListBuffer to store all DFs
import scala.collection.mutable.ListBuffer
val dfList = ListBuffer[DataFrame]()

(1 to 620).foreach(x=>{
  val df: DataFrame = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .load(BASE_PATH + s"file$x.csv")

  dfList += df
})
Abu Shoeb
  • 4,747
  • 2
  • 40
  • 45
vindev
  • 2,240
  • 2
  • 13
  • 20
  • Do you have any suggestion how to read 620 csv files in a loop? My file names are sequential. – Abu Shoeb Mar 15 '18 at 07:20
  • Great to hear that :) – vindev Mar 15 '18 at 07:57
  • After running the reduce function for 8 minutes, I couldn't see the result of joinedDf or write it to csv. It shows java.lang.StackOverflowError. Any idea? The reduce instruction finished successfully. – Abu Shoeb Mar 15 '18 at 08:05
  • where exactly are you getting SO. It may be because of large DAG as we are looping lots of DF. check this out https://stackoverflow.com/questions/37909444/spark-java-lang-stackoverflowerror – vindev Mar 15 '18 at 08:25
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/166875/discussion-between-abu-shoeb-and-vindev). – Abu Shoeb Mar 15 '18 at 09:02