AFAIK, the below use case may help to solve your problem,
Terminologies:
1. maxRecordsPerFile - Limit the max number of records written per file.
2. repartitionByRange(10, $"id")
- repartitionByRange(numPartitions: Int, partitionExprs: Column*)
- which will create numPartitions by splitting the partitionExprs into partitionExprs/numPartitions equal records splits.
- improve compression when writing data out to disk,
- memory partitioning based
- in order to write data on disk properly, you’ll almost always need to repartition the data in memory first.
3. partitionedBy("directory you wanted to write")
- method that specifies if the data should be written to disk in folders. By default, Spark does not write data to disk in nested folders.
- disk level partitioning
case 1: input rows - 1000
, repartition-10
, maxRecordsPerFile=inputrows/repartitioncount . 1000/10=100
. leads to 10 part-xxxxx
files with equal number of records(100 records in each file
) within a disk level partition directory(partition=1
)
import org.apache.spark.sql.functions.{col, lit, when}
val df=spark.range(1000)
val df1=df.withColumn("partitioncol",lit("1"))
df1.repartitionByRange(10, $"id").write.option("maxRecordsPerFile", 100).partitionBy("partitioncol").parquet("/FileStore/import-stage/all4")
case 2: input rows - 1000
, repartition-10
, maxRecordsPerFile>inputrows/repartitioncount . 1000
. again leads to 10 part-xxxxx files
with equal number of records(100 records in each file
) within a disk level partition directory(partition=1
)
import org.apache.spark.sql.functions.{col, lit, when}
val df=spark.range(1000)
val df1=df.withColumn("partitioncol",lit("1"))
df1.repartitionByRange(10, $"id").write.option("maxRecordsPerFile", 1000).partitionBy("partitioncol").parquet("/FileStore/import-stage/all4")
case 3: input rows - 1000
, repartition-10
, maxRecordsPerFile<inputrows/repartitioncount, example = 10
. leads to 100 part-xxxxx files
with equal number of records(10 records in each file
) within a disk level partition directory(partition=1
)
import org.apache.spark.sql.functions.{col, lit, when}
val df=spark.range(1000)
val df1=df.withColumn("partitioncol",lit("1"))
df1.repartitionByRange(10, $"id").write.option("maxRecordsPerFile", 10).partitionBy("partitioncol").parquet("/FileStore/import-stage/all4")