6

I am trying to read a subset of a dataset by using pushdown predicate. My input dataset consists in 1,2TB and 43436 parquet files stored on s3. With the push down predicate I am supposed to read 1/4 of data.

Seeing the Spark UI. I see that the job actually reads 1/4 of data (300GB) but there are still 43436 partitions in the first stage of the job however only 1/4 of these partitions has data, the other 3/4 are empty ones (check the median input data in the attached screenshots).

I was expecting Spark to create partitions only for non empty partitions. I am seeing a 20% performance overhead when reading the whole dataset with the pushdown predicate comparing to reading the prefiltred dataset by another job (1/4 of data) directly. I suspect that this overhead is due to the huge number of empty partitions/tasks I have in my first stage, so I have two questions:

  1. Are there any workaround to avoid these empty partitions?
  2. Do you think to any other reason responsible for the overhead? may be the pushdown filter execution is naturally a little bit slow?

Thank you in advance

spark ui data read

enter image description here

3 Answers3

1

Using S3 Select, you can retrieve only a subset of data.

With Amazon EMR release version 5.17.0 and later, you can use S3 Select with Spark on Amazon EMR. S3 Select allows applications to retrieve only a subset of data from an object.

Otherwise, S3 acts as an object store, in which case, an entire object has to be read. In your case you have to read all content from all files, and filter them on client side.

There is actually very similar question, where by testing you can see that:

The input size was always the same as the Spark job that processed all of the data

You can also see this question about optimizing data read from s3 of parquet files.

Yosi Dahari
  • 6,794
  • 5
  • 24
  • 44
  • Thank you @Yosi for your quick response: 1- So the information in the Spark UI input data = 300Gb is not precise I guess? 2- What confuses me is that selecting few columns from a dataset is quite fast, do you think that the same applies when I select only on column the whole file is transferred through the network and the filter is only applied in the client ? – Wassim Maaoui Jun 25 '20 at 18:11
  • 1. Well, thats not enough info.. 300 GB of what? please be specific, you might be confusing with shuffle. 2. Please see: https://stackoverflow.com/a/41609999/2628137 I think you are always reading all data. – Yosi Dahari Jun 25 '20 at 18:18
  • I was speaking about what shows the Spark UI about how much data the job read => https://i.stack.imgur.com/rhAFS.png. According to the Spark UI, only 300GB of data was read which represents 1/4 of the whole dataset. – Wassim Maaoui Jun 25 '20 at 18:52
  • 1
    how could a big file on S3 be read by many spark workers and into many partitions if we can't read a bloc from a file? – Wassim Maaoui Jun 25 '20 at 19:18
  • So on HDFS parquet reading is far more effiicent than S3? – thebluephantom Jun 25 '20 at 19:45
  • Some sites I have been at will not touch S3 - the techies that is. mgt have no clue and think it is all great. – thebluephantom Jun 25 '20 at 19:49
  • 1
    According to AWS S3 API documentation, you can ask s3 for a range of a file using the HTTP Range Headers: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html – Radhwane Chebaane Jun 26 '20 at 15:39
1

Seems like your files are rather small: 1.2TB / 43436 ≈ 30MB. So you may want to look at increasing the spark.sql.files.maxPartitionBytes, to see if it reduces the total number of partitions. I have not much experience with S3, so not sure whether its going to help given this note in its description:

The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

mazaneicha
  • 8,794
  • 4
  • 33
  • 52
  • Thank you for your response. I though this parameter was 128MB by default but spark is not merging my small partions. I will give it a try anyway today – Wassim Maaoui Jun 29 '20 at 07:06
  • 1
    Finally digging the meaning of that parameter and having a look to spark code gave me more insights, I posted more details in an answer. Thank you for your help – Wassim Maaoui Jul 01 '20 at 14:54
1

Empty partitions: It seems that spark (2.4.5) tries to really have partitions with size ≈ spark.sql.files.maxPartitionBytes (default 128MB) by packing many files into one partition, source code here. However it does this work before running the job, so it can't know that 3/4 of files will not output data after the pushed down predicate being applied. For the partitions where it will put only files whose lines will be filtered out, I ended up with empty partitions. This explains also why my max partition size is 44MB and not 128MB, because none of the partitions had by chance files that passed all the pushdown filter.

20% Overhead: Finally this is not due to empty partitions, I managed to have much less empty partitions by setting spark.sql.files.maxPartitionBytes to 1gb but it didn't improve reading. I think that the overhead is due to opening many files and reading their metadata. Spark estimates that opening a file is equivalent to reading 4MB spark.sql.files.openCostInBytes. So opening many files even if thanks to the filter won't be read shouldn't be negligible..