0

I have to generate 3000000 files as the output of spark job.

I have two input file :

File 1 -> Size=3.3 Compressed, No.Of Records=13979835
File 2 -> Size=1.g Compressed, No.Of Records=6170229

Spark Job is doing the following:

  1. reading both this file and joining them based on common column1. -> DataFrame-A
  2. Grouping result of DataFrame-A based on one column2 -> DataFrame-B
  3. From DataFrame-B used array_join for the aggregated column and separate that column by '\n' char. -> DataFrame-C
  4. Writing result of DataFrame-C partition by column2.

    val DF1 = sparkSession.read.json("FILE1") //    |ID     |isHighway|isRamp|pvId      |linkIdx|ffs |length            |
    val DF12 = sparkSession.read.json("FILE2") //    |lId    |pid       |
    
    val joinExpression = DF1.col("pvId") === DF2.col("lId")
    val DFA = DF.join(tpLinkDF, joinExpression, "inner").select(col("ID").as("SCAR"), col("lId"), col("length"), col("ffs"), col("ar"), col("pid")).orderBy("linkIdx")
    val DFB = DFA.select(col("SCAR"),concat_ws(",", col("lId"), col("length"),col("ffs"), col("ar"), col("pid")).as("links")).groupBy("SCAR").agg(collect_list("links").as("links"))
    
    val DFC = DFB.select(col("SCAR"), array_join(col("links"), "\n").as("links"))
    DFC.write.format("com.databricks.spark.csv").option("quote", "\u0000").partitionBy("SCAR").mode(SaveMode.Append).format("csv").save("/tmp")
    

I have to generate 3000000 files as output of spark job.

1 Answers1

1

After running some test I got an idea to run this job in batch like :

  • query startIdx: 0, endIndex:100000
  • query startIdx: 100000, endIndex:200000
  • query startIdx: 200000, endIndex:300000

and so.... on till

  • query startIdx: 2900000, endIndex:3000000