0

I'm trying to read a folder consists of many small parquet files: 600 files, 500KB each. And then repartition them into 2 files.

val df = spark.read.parquet("folder")
df.repartition(2).write.mode("overwrite").parquet("output_folder")

And this is horribly slow, up to 10 minutes. From spark UI I can see 2 executors are handling 2 tasks. I give each executor 10GB of memory.

enter image description here

So what is the reason for the slow speed? Is it because of disk IO? And how can I improve the performance in this case.

Edit: I also tried using coalesce and the performance doesn't look different.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
Minh Thai
  • 568
  • 6
  • 18

2 Answers2

0

First option is to make a big file out of small parquet files at source level is to merge them together as multiple files > 128 mb sized files or what ever size you wanted

how to merge multiple parquet files to single parquet file using linux or hdfs command?

Second option i.e. Using spark : read small parquet files and then before actual data business processing logic using spark and write them in to relatively big sized files as expected by you (by taking performance factors in to consideration)


Second option :

Even though what is your spark job configuration I am not aware... But in general coalesce should work.... try like below example (master -> local but change it to yarn for your app) which worked for me. here in this example I took small files "./userdata*.parquet" (5 small files all around 110 KB) under src/main/resources and merged in to final 2 files with coalesce...

Approach : Read each parquet file as a dataframe and then union to make single dataframe and then coalesce it.

  package com.examples

import org.apache.hadoop.conf._
import org.apache.hadoop.fs._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import scala.collection.mutable

/** *
  * take small pegs and make a large peg
  * and coalesce it
  *
  * @author : Ram Ghadiyaram
  */
object ParquetPlay extends Logging {
  Logger.getLogger("org").setLevel(Level.OFF)


  //public FileStatus[] globStatus(Path pathPattern) throws IOException
  def main(args: Array[String]): Unit = {


 val appName = if (args.length >0) args(0) else this.getClass.getName
    val spark: SparkSession = SparkSession.builder
      .config("spark.master", "local")
      .appName(appName)
      .getOrCreate()
    val fs = FileSystem.get(new Configuration())

    val files = fs.globStatus(new Path("./userdata*.parquet")).map(_.getPath.toString)
    val dfSeq = mutable.MutableList[DataFrame]()
    println(dfSeq)
    println(files.length)
    files.foreach(x => println(x))
    val newDFs = files.map(dir => {
      dfSeq += spark.read.parquet(dir).toDF()
    })
    println(dfSeq.length)
    val finalDF = dfSeq.reduce(_ union _)
      .toDF
    finalDF.show(false)
    println(System.getProperty("java.io.tmpdir"))
    println(System.getProperties.toString)
    finalDF.coalesce(2)
      .write
      .mode(SaveMode.Overwrite)
      .parquet(s"${System.getProperty("java.io.tmpdir")}/final.parquet")
    println("done")
  }
}

Result : Nearly equal sized 2 files like below... here in the example again it generated small files but in your case since you have 500KB size and around 600 files you can see the size of the file and you can decide coalesce(number of partitions of your expectation)

enter image description here

Third Option : As Minh (original poster) mentioned in the comment ... there might be big files which are highly compressed which become small after compression might be causing this.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • Certainly it works in both your code and my code. The problem is that it is **slow**. That is what I'm trying to tackle. – Minh Thai Oct 03 '18 at 06:54
  • 1
    AFAIK there is no other approachI know apart from above 2. Have you tried the above way also ? what are your configurations in spark-submit? – Ram Ghadiyaram Oct 03 '18 at 06:57
  • I give each executor 5 cores and 10GB of memory. The number of executors doesn't seem to be relevant since in Spark UI, there are only 2 tasks assigned to 2 executors. – Minh Thai Oct 03 '18 at 07:02
  • yes the number of executors are not so imp. here since there is no big data here. I saw each task is running in 45 sec. do you say this is slow ? what is your expectation ? there might be scheduling/allocation delays as well. which is not in your job scope. isnt it ? – Ram Ghadiyaram Oct 03 '18 at 07:04
  • Actually, it is not 45s since in this screenshot, the program is not complete yet. It goes up to 10 min. I don't expect anything special. I'm just trying to understand the reason for that run time and if someone can give me tips to improve it if possible. – Minh Thai Oct 03 '18 at 07:16
  • 1
    okay! there might be another reason apart from the program. I feel some delay in getting sc or scheduling delay... might be.... – Ram Ghadiyaram Oct 03 '18 at 07:18
  • updated the answer after doing some research on parquet tools... in any case before your business processing logic on your dataframes you have to merge them. Please note that hive has merge small files option as mentioned in [this](https://stackoverflow.com/a/44050294/647053) since you are not using hive I have not suggested that.... pls feel free to ask questions on this.... AFAIK there is no other options available... – Ram Ghadiyaram Oct 04 '18 at 07:25
  • Also can you pinpoint in the spark ui where exactly its taking more time ? based on that we can discuss if any scope is here. But I feel its natural thing. being in the other side its very difficult to tell what exactly might have happened there. – Ram Ghadiyaram Oct 04 '18 at 07:51
  • @minh-thai : any other elegant solutions you found ? – Ram Ghadiyaram Nov 28 '18 at 20:48
  • I found that the small file size is because it is heavily compressed. The actual data is much bigger than that. Hence the slow runtime. – Minh Thai Nov 30 '18 at 02:10
  • Spark coalesce() is preferable over repartition() as the former will reuse existing partitions rather than unecessarily creating new ones. For a generic approach when you have no way to know if the target number of partitions is higher or lower you should use repartition() as it works for both use cases. – Mário de Sá Vera Mar 10 '20 at 14:17
0

This is a trade-off Spark currently has (and release 3.0 should solve it) as the number of tasks should implicate in a 1x1 mapping to the number of files ... so the greater the number of tasks the better for performance but really not ideal from a partitioning perspective as files might be quite small in that case.

Another issue is that most of the time , the final repartitioned dataset will grow in volume as the compression algorithms no longer have information about keys. For real life Big Data that is a major problem as the disk space occupancy will grow considerably. That is true particularly for very nested datasets.

A solution for that is to flatten down the datasets in simple schemas so that we can take advantage of compression algorithms everytime we write out to disk.

hope that helps !

Mário de Sá Vera
  • 380
  • 1
  • 4
  • 12