I have a dataframe with columns "month", "year", "day", "timestamp", ".... and other columns" which was read from a parquet table partitioned by "year", "month" and "day". I need the data partitioned in such a way that each partition has data corresponding to only one "year", "month", "day" combination.
I have a dataframe with columns "month", "year", "day", "timestamp", ".... and other columns" which was read from a parquet table partitioned by "year", "month" and "day". I need the data partitioned in such a way that each partition has data corresponding to only one "year", "month", "day" combination.
I would then run a sortWithinPartitions
on timestamp
and then process the data in a sequential manner in each partition (i.e. by mapPartitions). The issue is repartition
on columns does not ensure that a partition will have rows with only one combination of "month", "year" and "day".
To get around this I have done
df.repartition("year", "month", "day", MAX_INT)
.sortWithinPartitions($"timestamp")
.rdd
.mapPartitions(sequential_processing_function)
It is difficult to verify easily if this is working correctly as expected.
Question is - will this work as expected, i.e. each partition will have just the data for a single combination of "year", "month", "day".
Here is what I tried out based on user @user6910411 comments
val keyList = (df.select($"year", $"month", $"day")
.distinct()
.select(concat($"year", lit(" "),
$"month", lit(" "),
$"day").alias("partition_key"))
.rdd
.map(x => x.getString(0))
.collect())
val keyIndexMap = collection.mutable.Map[String, Long]()
for (i <- keyList.indices) keyIndexMap(keyList(i)) = i
var keyIndexMapBC = sc.broadcast(keyIndexMap)
class ExactPartitioner[V]() extends Partitioner {
def getPartition(key: Any): Int = {
return keyIndexMapBC.value(key.asInstanceOf[String]).toInt
}
def numPartitions(): Int = {
return keyIndexMapBC.value.size
}
}
val df_partitioned =
spark.createDataFrame(df,
.select("year", "month", "day", "timestamp", "other_columns")
.rdd.map(row => (row.getAs[String]("year") + " " +
row.getAs[String]("month") + " " +
row.getAs[String]("day"), row))
.partitionBy(new ExactPartitioner).values,
intermediate_data_schema)
With this df_partitioned.rdd.partitions.size
gives me the correct number of partitions.
Again how do I verify if everything went right and it is working as expected?