-1

I am writing data as Parquet files as below -

df.repartitionByRange($"key", rand)
  .write
  .option("maxRecordsPerFile", 5000)
  .partitionBy("key")
  .parquet("somelocation")

I have used a string column(key) for partitioning which is a city as I have more filters based on that.

Even after specifying maxRecordsPerFile, multiple small files(tens or hundreds of records) are getting created in 1 partition folder.

Bartosz Konieczny
  • 1,985
  • 12
  • 27
drlol
  • 333
  • 4
  • 18
  • 1
    format of your input and how format of your output, please provide some more information? – sathya Jul 27 '20 at 10:52
  • @smart_coder I am reading dataframe of parquet files as and storing the same data with partitioning in parquet format – drlol Jul 27 '20 at 11:46
  • 1
    Could you update the question with the partition column provided? – sathya Jul 27 '20 at 11:48
  • I have used maxRecordsPerFile so that i can limit number of records in file. but in 1 partition(say NY)- multiple files are getting created with 50 records or 100 records.. – drlol Jul 27 '20 at 11:52
  • Does this answer your question? [Spark SQL - Difference between df.repartition and DataFrameWriter partitionBy?](https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby) – mazaneicha Jul 27 '20 at 13:54
  • not exaclty, my question here is : I am using maxRecordsPerFile then why small files are getting created by repartitionByRange? – drlol Jul 27 '20 at 14:13
  • 2
    Hmmm.... What language is that? I mean, I think in most languages repartition signature is `repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]`. Having `$"key"` as a first argument doesn't look like a good match, no? – mazaneicha Jul 27 '20 at 18:22
  • 2
    I think @mazaneicha comment is highly relevant – thebluephantom Jul 27 '20 at 20:10
  • Explanation of repartitionByRange and Repartition along with random key are well explained in below link https://medium.com/airbnb-engineering/on-spark-hive-and-small-files-an-in-depth-look-at-spark-partitioning-strategies-a9a364f908 – drlol Jul 29 '20 at 06:12

1 Answers1

2

AFAIK, the below use case may help to solve your problem,

Terminologies:

1. maxRecordsPerFile - Limit the max number of records written per file.

2. repartitionByRange(10, $"id")

  • repartitionByRange(numPartitions: Int, partitionExprs: Column*)
  • which will create numPartitions by splitting the partitionExprs into partitionExprs/numPartitions equal records splits.
  • improve compression when writing data out to disk,
  • memory partitioning based
  • in order to write data on disk properly, you’ll almost always need to repartition the data in memory first.

3. partitionedBy("directory you wanted to write")

  • method that specifies if the data should be written to disk in folders. By default, Spark does not write data to disk in nested folders.
  • disk level partitioning

case 1: input rows - 1000, repartition-10, maxRecordsPerFile=inputrows/repartitioncount . 1000/10=100. leads to 10 part-xxxxx files with equal number of records(100 records in each file) within a disk level partition directory(partition=1)

import org.apache.spark.sql.functions.{col, lit, when}
val df=spark.range(1000)
val df1=df.withColumn("partitioncol",lit("1"))

df1.repartitionByRange(10, $"id").write.option("maxRecordsPerFile", 100).partitionBy("partitioncol").parquet("/FileStore/import-stage/all4")

case 2: input rows - 1000, repartition-10, maxRecordsPerFile>inputrows/repartitioncount . 1000. again leads to 10 part-xxxxx files with equal number of records(100 records in each file) within a disk level partition directory(partition=1)

import org.apache.spark.sql.functions.{col, lit, when}
val df=spark.range(1000)
val df1=df.withColumn("partitioncol",lit("1"))

df1.repartitionByRange(10, $"id").write.option("maxRecordsPerFile", 1000).partitionBy("partitioncol").parquet("/FileStore/import-stage/all4")

case 3: input rows - 1000, repartition-10, maxRecordsPerFile<inputrows/repartitioncount, example = 10. leads to 100 part-xxxxx files with equal number of records(10 records in each file) within a disk level partition directory(partition=1)

import org.apache.spark.sql.functions.{col, lit, when}
val df=spark.range(1000)
val df1=df.withColumn("partitioncol",lit("1"))

df1.repartitionByRange(10, $"id").write.option("maxRecordsPerFile", 10).partitionBy("partitioncol").parquet("/FileStore/import-stage/all4")
sathya
  • 1,982
  • 1
  • 20
  • 37
  • Thanks for the explanation !! before I accept the answer, Can you please explain me the case where both partitionBy Column and repartitionByRange columns are same.. As In My question its Key column. – drlol Jul 28 '20 at 05:57
  • what if I add random as part of partitionExpression in repartitionByRange? – drlol Jul 28 '20 at 09:29