24

Context

Spark 2.0.1, spark-submit in cluster mode. I am reading a parquet file from hdfs:

val spark = SparkSession.builder
      .appName("myApp")
      .config("hive.metastore.uris", "thrift://XXX.XXX.net:9083")
      .config("spark.sql.sources.bucketing.enabled", true)
      .enableHiveSupport()
      .getOrCreate()

val df = spark.read
              .format("parquet")
              .load("hdfs://XXX.XX.X.XX/myParquetFile")

I am saving the df to a hive table with 50 buckets grouped by userid:

df0.write
   .bucketBy(50, "userid")
   .saveAsTable("myHiveTable")

Now, when I look into the hive warehouse at my hdfs /user/hive/warehouse there is a folder named myHiveTable. Inside it are a bunch of part-*.parquet files. I would expect there to be 50 files. But no, there are 3201 files!!!! There are 64 files per partition, why? There are different number of files per partitions for different files I saved as hive table. All the files are very small, just tens of Kb each!

Let me add, that number of different userid is about 1 000 000 in myParquetFile.

Question

Why are there 3201 files in the folder instead of 50! What are they?

When I read this table back into DataFrame and print number of partitions:

val df2 = spark.sql("SELECT * FROM myHiveTable") 
println(df2.rdd.getNumPartitions)

The number of partitions isIt is correctly 50 and I confirmed that the data is correctly partitioned by userid.

For one of my large datasets 3Tb I create a table with 1000 partitions which created literally ~million of files! Which exceeds a directory item limit of 1048576 and gives org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException

Question

What does the number of files created depend on?

Question

Is there a way to limit number of files created?

Question

Should I worry about these files? Does it hurt performance on df2 by having all these files? It is always said we should not create too many partitions because it is problematic.

Question

I found this info HIVE Dynamic Partitioning tips that the number of files might be related to number of mappers. It is suggested to use distribute by while inserting to hive table. How could I do it in Spark?

Question

If the problem is indeed as in the link above, here How to control the file numbers of hive table after inserting data on MapR-FS they suggest using options such as hive.merge.mapfiles or hive.merge.mapredfiles to merge all the small files after map reduce job. Are there options for this in Spark?

Koedlt
  • 4,286
  • 8
  • 15
  • 33
astro_asz
  • 2,278
  • 3
  • 15
  • 31
  • 4
    You have six good questions here but none of them were actually answered. Have you found answers to your questions (rather than just a solution or workaround)? I would like to know how to solve the same issue without statically configuring all those Hive parameters. – joe Mar 07 '19 at 16:05
  • @joe Nope, I gave up on this. I believe the solution proposed by Ravikumar (In hive command line to create bucketed table and insert data) might work, but we had a problem with installation of hadoop on our cluster and I could not test it properly. – astro_asz Mar 07 '19 at 16:18

4 Answers4

17

Please use spark sql which will use HiveContext to write data into Hive table, so it will use the number of buckets which you have configured in the table schema.

 SparkSession.builder().
  config("hive.exec.dynamic.partition", "true").
  config("hive.exec.dynamic.partition.mode", "nonstrict").
  config("hive.execution.engine","tez").
  config("hive.exec.max.dynamic.partitions","400").
  config("hive.exec.max.dynamic.partitions.pernode","400").
  config("hive.enforce.bucketing","true").
  config("optimize.sort.dynamic.partitionining","true").
  config("hive.vectorized.execution.enabled","true").
  config("hive.enforce.sorting","true").
  enableHiveSupport().getOrCreate()

spark.sql(s"insert into hiveTableName partition (partition_column) select * from  myParquetFile")

The bucketing implementation of spark is not honoring the specified number of bucket size. Each partitions is writing into a separate files, hence you end up with lot of files for each bucket.

Please refer this link https://www.slideshare.net/databricks/hive-bucketing-in-apache-spark-with-tejas-patil

enter image description here Hope this helps.

Ravi

Ravikumar
  • 1,121
  • 1
  • 12
  • 23
  • The Spark community is working on improve bucketing https://issues.apache.org/jira/browse/SPARK-19256 – Ravikumar Feb 02 '18 at 22:00
  • Thanks @Ravikumar for the answer! I will be testing today. Just to clarify, before inserting to table `insert into hiveTableName` I need to `create table`? And `config("hive.exec.max.dynamic.partitions","400")`, `config("hive.exec.max.dynamic.partitions.pernode","400")` will define number of buckets? How do I specify partitionBy column? – astro_asz Feb 05 '18 at 09:23
  • 2
    Yes, you need to create hive table before executing this. Partitioning to be specified in schema definition. create external table hivetable ( objecti1 string, col2 string, col3 string ) PARTITIONED BY (currentbatch string) CLUSTERED BY (col2) INTO 8 BUCKETS STORED AS PARQUET LOCATION 's3://s3_table_name' – Ravikumar Feb 06 '18 at 16:57
  • 2
    @Ravikumar if the partitioning column is mentioned during the hive table creation , why do we need to mention it during the spark.sql insert into code ? – Ayan Biswas Oct 03 '18 at 10:22
  • 2
    @AyanBiswas When inserting the records into partition table, in-which partition this record should go to. As per INSERT specification, for dynamic partition specify the dynamic partition column name alone otherwise hard-code partition value. As of Hive 3.0.0 (HIVE-19083) there is no need to specify dynamic partition columns. Hive will automatically generate partition specification if it is not specified. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingvaluesintotablesfromSQL – Ravikumar Oct 03 '18 at 21:46
  • @Ravikumar I tried the strategy to create an external hive table and then use spark sql to insert the data , but the insert into statement for spark sql is failing.I have posted a query here https://stackoverflow.com/questions/52638606/spark-sql-insert-into-external-hive-table-error – Ayan Biswas Oct 04 '18 at 13:08
  • @Ravikumar. can we select or view data of these tables later in hive which are created by using "bucketBy" method. because when I use buckeyBy method, the spark schema is not compatible with hive. df2.write.format('orc').bucketBy(20,"GUDB05_VIN_MODYR_C").mode("overwrite").saveAsTable('udb.bucketed_table'). It shows me table like this col | array | from deserializer – vikrant rana Nov 21 '18 at 06:30
13

I was able to find a workaround (on Spark 2.1). It solves the number of files problem but might have some performance implications.

dataframe
  .withColumn("bucket", pmod(hash($"bucketColumn"), lit(numBuckets)))
  .repartition(numBuckets, $"bucket")
  .write
  .format(fmt)
  .bucketBy(numBuckets, "bucketColumn")
  .sortBy("bucketColumn")
  .option("path", "/path/to/your/table")
  .saveAsTable("table_name")

I think spark's bucketing algorithm does a positive mod of MurmurHash3 of the bucket column value. This simply replicates that logic and repartitions the data so that each partition contains all the data for a bucket.

You can do the same with partitioning + bucketing.

dataframe
  .withColumn("bucket", pmod(hash($"bucketColumn"), lit(numBuckets)))
  .repartition(numBuckets, $"partitionColumn", $"bucket")
  .write
  .format(fmt)
  .partitionBy("partitionColumn")
  .bucketBy(numBuckets, "bucketColumn")
  .sortBy("bucketColumn")
  .option("path", "/path/to/your/table")
  .saveAsTable("table_name")

Tested with 3 partitions and 5 buckets locally using csv format (both partition and bucket columns are just numbers):

$ tree .
.
├── _SUCCESS
├── partitionColumn=0
│   ├── bucket=0
│   │   └── part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv
│   ├── bucket=1
│   │   └── part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv
│   ├── bucket=2
│   │   └── part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv
│   ├── bucket=3
│   │   └── part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv
│   └── bucket=4
│       └── part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv
├── partitionColumn=1
│   ├── bucket=0
│   │   └── part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv
│   ├── bucket=1
│   │   └── part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv
│   ├── bucket=2
│   │   └── part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv
│   ├── bucket=3
│   │   └── part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv
│   └── bucket=4
│       └── part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv
└── partitionColumn=2
    ├── bucket=0
    │   └── part-00000-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv
    ├── bucket=1
    │   └── part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv
    ├── bucket=2
    │   └── part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv
    ├── bucket=3
    │   └── part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv
    └── bucket=4
        └── part-00000-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv

Here's the bucket=0 for all 3 partitions (you can see that they are all the same values):

$ paste partitionColumn=0/bucket=0/part-00004-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv partitionColumn=1/bucket=0/part-00002-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv partitionColumn=2/bucket=0/part-00000-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv | head
0   0   0
4   4   4
6   6   6
16  16  16
18  18  18
20  20  20
26  26  26
27  27  27
29  29  29
32  32  32

I actually liked the extra bucket index. But if you don't, you can drop the bucket column right before write and you'll get the numBuckets number of files per partition.

Koedlt
  • 4,286
  • 8
  • 15
  • 33
Bill Kuang
  • 131
  • 1
  • 3
  • in this approach , is there a need to create external hive table? – Ayan Biswas Oct 08 '18 at 10:18
  • 1
    No there isn't. The idea behind the approach is to repartition in such a way that all data belonging to the same partition and/or bucket will be on the same executor BEFORE calling "write". – Bill Kuang Nov 15 '18 at 20:16
  • Is Spark still using different hashing algorithms for partitioning and bucketing? The principle of least astonishment, just saying. – Molotch Sep 09 '19 at 09:23
  • what to do if we have more than one column in bucketBy statement? – Nebi M Aydin Apr 12 '23 at 19:21
4

In my mind also these questions poped up when I saw too many files so searched and found this

"Unlike bucketing in Apache Hive, Spark SQL creates the bucket files per the number of buckets and partitions. In other words, the number of bucketing files is the number of buckets multiplied by the number of task writers (one per partition)."

Source: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-bucketing.html

I think this answers your question why this no. of files

Your question no. 2 can be answered like If we could manage no. of partitions by repartition, provided the resource available, we can limit the files created.

Athar
  • 963
  • 10
  • 23
1

I had similar problem and the article Best Practices for Bucketing in Spark SQL helped to resolve this issue. Use the repartition approach as shown below.

df.repartition(expr("pmod(hash(user_id), <<number of buckets>>)"))
  .write
  .Mode(saving_mode)
  .bucketBy(<<number of buckets>>, 'user_id')
  .option("path", output_path)
  .saveAsTable(table_name)

Please see the article for more details.

Koedlt
  • 4,286
  • 8
  • 15
  • 33
Sam
  • 281
  • 5
  • 15