17

I have a DataFrame generated as follows:

df.groupBy($"Hour", $"Category")
  .agg(sum($"value").alias("TotalValue"))
  .sort($"Hour".asc,$"TotalValue".desc))

The results look like:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
|   3|    cat8|      35.6|
| ...|    ....|      ....|
+----+--------+----------+

I would like to make new dataframes based on every unique value of col("Hour") , i.e.

  • for the group of Hour==0
  • for the group of Hour==1
  • for the group of Hour==2 and so on...

So the desired output would be:

df0 as:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
+----+--------+----------+

df1 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
+----+--------+----------+

and similarly,

df2 as:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
+----+--------+----------+

Any help is highly appreciated.

EDIT 1:

What I have tried:

df.foreach(
  row => splitHour(row)
  )

def splitHour(row: Row) ={
    val Hour=row.getAs[Long]("Hour")

    val HourDF= sparkSession.createDataFrame(List((s"$Hour",1)))

    val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2")

    val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique"))

    mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/")
  }

PROBLEM WITH THIS STRATEGY:

It took 8 hours when it was run on a dataframe df which had over 1 million rows and spark job was given around 10 GB RAM on single node. So, join is turning out to be highly in-efficient.

Caveat: I have to write each dataframe mydf as parquet which has nested schema that is required to be maintained (not flattened).

ZygD
  • 22,092
  • 39
  • 79
  • 102
shubham rajput
  • 1,015
  • 1
  • 9
  • 12
  • 3
    Could you do `df.write.partitionBy("hour").saveAsTable("myparquet")` to do this? – Denny Lee Jan 15 '17 at 19:34
  • 2
    @DennyLee Thanks, this worked 60 times faster than my strategy! But it saves the resultant file with names like `hour=0`, `hour=1`,etc and I want files to be saved as `0`,`1`,etc. Could you please give your insights on how to achieve this? – shubham rajput Jan 15 '17 at 20:48
  • 2
    You may be able to use hiveContext with the configuration with `hive.dynamic.partitioning.custom.pattern` but one of the advantages of keeping it as `hour=0`, `hour=1`, etc. is that when you're running `spark.read.parquet(...)` it will automatically understand the underlying dynamic partitions. Another potential approach would be to rename the folders afterwards (i.e. use `mv` command) but you would still run into the same issue that `read.parquet` will not automatically understand the dynamic partitions. – Denny Lee Jan 16 '17 at 06:02

3 Answers3

26

As noted in my comments, one potentially easy approach to this problem would be to use:

df.write.partitionBy("hour").saveAsTable("myparquet")

As noted, the folder structure would be myparquet/hour=1, myparquet/hour=2, ..., myparquet/hour=24 as opposed to myparquet/1, myparquet/2, ..., myparquet/24.

To change the folder structure, you could

  1. Potentially use the Hive configuration setting hcat.dynamic.partitioning.custom.pattern within an explicit HiveContext; more information at HCatalog DynamicPartitions.
  2. Another approach would be to change the file system directly after you have executed the df.write.partitionBy.saveAsTable(...) command with something like for f in *; do mv $f ${f/${f:0:5}/} ; done which would remove the Hour= text from the folder name.

It is important to note that by changing the naming pattern for the folders, when you are running spark.read.parquet(...) in that folder, Spark will not automatically understand the dynamic partitions since its missing the partitionKey (i.e. Hour) information.

Denny Lee
  • 3,154
  • 1
  • 20
  • 33
1

Another possible solution:

df.write.mode("overwrite").partitionBy("hour").parquet("address/to/parquet/location")

This is similar to the first answer except using parquet and using mode("overwrite").

ZygD
  • 22,092
  • 39
  • 79
  • 102
A.M.
  • 1,757
  • 5
  • 22
  • 41
-1
//If you want to divide a dataset into n number of equal datasetssets
double[] arraySplit = {1,1,1...,n}; //you can also divide into ratio if you change the numbers.

List<Dataset<String>> datasetList = dataset.randomSplitAsList(arraySplit,1);
Fareanor
  • 5,900
  • 2
  • 11
  • 37
  • 2
    Please edit your post to include an explanation of the code in your answer. This will make your solution more useful to others, and make it more likely for the post to be upvoted :) – Das_Geek Oct 04 '19 at 19:10