0

I have a dataframe with columns "month", "year", "day", "timestamp", ".... and other columns" which was read from a parquet table partitioned by "year", "month" and "day". I need the data partitioned in such a way that each partition has data corresponding to only one "year", "month", "day" combination.

I have a dataframe with columns "month", "year", "day", "timestamp", ".... and other columns" which was read from a parquet table partitioned by "year", "month" and "day". I need the data partitioned in such a way that each partition has data corresponding to only one "year", "month", "day" combination.

I would then run a sortWithinPartitions on timestamp and then process the data in a sequential manner in each partition (i.e. by mapPartitions). The issue is repartition on columns does not ensure that a partition will have rows with only one combination of "month", "year" and "day". To get around this I have done

df.repartition("year", "month", "day", MAX_INT)                                      
  .sortWithinPartitions($"timestamp")
  .rdd                                       
  .mapPartitions(sequential_processing_function)

It is difficult to verify easily if this is working correctly as expected.

Question is - will this work as expected, i.e. each partition will have just the data for a single combination of "year", "month", "day".

Here is what I tried out based on user @user6910411 comments

val keyList = (df.select($"year", $"month", $"day")
                 .distinct()
                 .select(concat($"year", lit(" "),
                                $"month", lit(" "),
                                $"day").alias("partition_key"))
                .rdd
                .map(x => x.getString(0))
                .collect())
val keyIndexMap = collection.mutable.Map[String, Long]()
for (i <- keyList.indices) keyIndexMap(keyList(i)) = i
var keyIndexMapBC = sc.broadcast(keyIndexMap)

class ExactPartitioner[V]() extends Partitioner {
  def getPartition(key: Any): Int = {
    return keyIndexMapBC.value(key.asInstanceOf[String]).toInt
  }

  def numPartitions(): Int = {
      return keyIndexMapBC.value.size
  }
}
val df_partitioned =
    spark.createDataFrame(df,                                                        
        .select("year", "month", "day", "timestamp", "other_columns")                                                                                                                  
        .rdd.map(row => (row.getAs[String]("year") + " " +                                                                         
                         row.getAs[String]("month") + " " +                                                                          
                         row.getAs[String]("day"), row))
        .partitionBy(new ExactPartitioner).values,                                                       
        intermediate_data_schema)

With this df_partitioned.rdd.partitions.size gives me the correct number of partitions.

Again how do I verify if everything went right and it is working as expected?

Prakash J
  • 21
  • 3
  • _Will this work as expected_ - [it won't](https://stackoverflow.com/q/31424396/6910411). – zero323 Jan 05 '19 at 19:35
  • @user6910411 Is there a way to achieve this through some other way, i.e. get all rows for a "year", "month", "day" combo into the same partition? I am looking into https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/rdd/PairRDDFunctions.html#partitionBy(org.apache.spark.Partitioner) . Does something similar exist for a dataframe? – Prakash J Jan 06 '19 at 07:20
  • `partitionBy` doesn't work like that either. You might consider custom partitioner from the legacy API,. – zero323 Jan 06 '19 at 13:38
  • @user6910411 I have added something that I tried out it as part of the question. Would this work? If not, why? If yes, is there a quick way to verify if this worked? – Prakash J Jan 06 '19 at 17:04
  • At first glance it looks like a step in the right direction. – zero323 Jan 07 '19 at 13:45

0 Answers0