4

I just started to play with Spark 2+ (2.3 version) and I observed something strange when looking at Spark UI. I have a list of directories in HDFS cluster containing in total 24000 small files.

When I want to run a Spark action on them, Spark 1.5 generates an individual task for each input file, as I was used until now. I know that each HDFS block (in my case one small file is one block) generate one partition in Spark and each partition is processed by an individual task.

Spark 1.5 UI screenshot

Also, the command my_dataframe.rdd.getNumPartitions() outputs 24000.

Now about Spark 2.3 On the same input, command my_dataframe.rdd.getNumPartitions() outputs 1089. The Spark UI also generate 1089 tasks for my Spark action. You can see also that the number of generated jobs is bigger in spark 2.3 then 1.5

Spark 2.3 UI screenshot

The code is the same for both Spark versions (I needed to change a little the dataframe, paths and column names, because it's code from work):

%pyspark
dataframe = sqlContext.\
                read.\
                parquet(path_to_my_files)
dataframe.rdd.getNumPartitions()
dataframe.\
    where((col("col1") == 21379051) & (col("col2") == 2281643649) & (col("col3") == 229939942)).\
    select("col1", "col2", "col3").\
    show(100, False)

Here is the physical plan generated by

dataframe.where(...).select(...).explain(True)
Spark 1.5
== Physical Plan ==
Filter (((col1 = 21379051) && (col2 = 2281643649)) && (col3 = 229939942))
 Scan ParquetRelation[hdfs://cluster1ns/path_to_file][col1#27,col2#29L,col3#30L]
Code Generation: true

Spark 2.3
== Physical Plan ==
*(1) Project [col1#0, col2#2L, col3#3L]
+- *(1) Filter (((isnotnull(col1#0) && (col1#0 = 21383478)) && (col2 = 2281643641)) && (col3 = 229979603))
   +- *(1) FileScan parquet [col1,col2,col3] Batched: false, Format: Parquet, Location: InMemoryFileIndex[hdfs://cluster1ns/path_to_file..., PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: struct<col1:bigint,col2:bigint,col3:bigint>....

Above jobs was generated from zeppelin using pyspark. Is there anyone else who met this situation with spark 2.3 ? I do have to say that I like the new way to process multiple small files, but I would like also to have knowledge about possible internal Spark changes.

I searched on internet, newest book "Spark the definitive guide" but didn't find any information about a new way of Spark to generate the physical plan for jobs.

If you have any links or information, would be interesting to read. Thanks!

zero323
  • 322,348
  • 103
  • 959
  • 935
  • can you please provide job code? – addmeaning Mar 28 '18 at 10:36
  • 1
    Hi @addmeaning, I added the code in the post context. Thanks ! – Tudor Lapusan Mar 28 '18 at 10:56
  • 1
    The code is pretty straightforward. Can you try to run `dataframe.explain(True)` for both versions of spark, to test if code translates to a different set of operations? – addmeaning Mar 28 '18 at 11:01
  • Nice idea, I run the explain(True) on the resulted dataframe from select. I added the physical plan (again with some code obfuscation) – Tudor Lapusan Mar 28 '18 at 11:22
  • 2
    2.x [doesn't use Hadoop configuration to compute splits](https://stackoverflow.com/a/40703599/6910411). So this accounts for the number of partitions. – zero323 Mar 28 '18 at 11:59
  • Thanks @user6910411, I could be the explanation. I have to check that. But seems to be more posts related to "spark.sql.files.maxPartitionBytes" config properties on internet. – Tudor Lapusan Mar 28 '18 at 13:23

1 Answers1

-2

From Spark 2.3 configuration

|spark.files.maxPartitionBytes| 134217728 (128 MB) | The maximum number of bytes to pack into a single partition when reading files.