15

First, I am very new to SPARK

I have millions of records in my Dataset and i wanted to groupby with name column and finding names which having maximum age. I am getting correct results but I need all columns in my resultset.

Dataset<Row> resultset = studentDataSet.select("*").groupBy("name").max("age");
resultset.show(1000,false);

I am getting only name and max(age) in my resultset dataset.

Anup Sapkale
  • 173
  • 1
  • 1
  • 6

5 Answers5

13

For your solution you have to try different approach. You was almost there for solution but let me help you understand.

Dataset<Row> resultset = studentDataSet.groupBy("name").max("age");

now what you can do is you can join the resultset with studentDataSet

Dataset<Row> joinedDS = studentDataset.join(resultset, "name");

The problem with groupBy this that after applying groupBy you get RelationalGroupedDataset so it depends on what next operation you perform like sum, min, mean, max etc then the result of these operation joined with groupBy

As in you case name column is joined with the max of age so it will return only two columns but if use apply groupBy on age and then apply max on 'age' column you will get two column one is age and second is max(age).

Note :- code is not tested please make changes if needed Hope this clears you query

Akash Sethi
  • 2,284
  • 1
  • 20
  • 40
  • 1
    Hey Thank you ! studentDataset.join(resultset, expression) solved my problem. – Anup Sapkale Jan 09 '17 at 04:53
  • I know it is 1 year old post but still Hey Anup ! good to know that you got the solution, Can you help us also by posting your code ? – NickyPatel Feb 14 '18 at 06:48
  • Hey Akash and Anup This does not work for me. Even after the join, I am getting only two columns and not the other ones. Does it has something to do with spark version as well. I am using spark 2.1 – Jitender Yadav Jul 12 '18 at 05:30
  • I also think the join can produce a second round of shuffling, so I added another solution that does not join at the end AND maintains use of strict Dataset usage (no Dataframes). – codeaperature Jun 05 '19 at 19:07
3

The accepted answer isn't ideal because it requires a join. Joining big DataFrames can cause a big shuffle that'll execute slowly.

Let's create a sample data set and test the code:

val df = Seq(
  ("bob", 20, "blah"),
  ("bob", 40, "blah"),
  ("karen", 21, "hi"),
  ("monica", 43, "candy"),
  ("monica", 99, "water")
).toDF("name", "age", "another_column")

This code should run faster with large DataFrames.

df
  .groupBy("name")
  .agg(
    max("name").as("name1_dup"), 
    max("another_column").as("another_column"),  
    max("age").as("age")
  ).drop(
    "name1_dup"
  ).show()

+------+--------------+---+
|  name|another_column|age|
+------+--------------+---+
|monica|         water| 99|
| karen|            hi| 21|
|   bob|          blah| 40|
+------+--------------+---+
Powers
  • 18,150
  • 10
  • 103
  • 108
  • .agg( max("name").as("name1_dup"), max("another_column").as("another_column"), max("age").as("age") ) gives compile time exception – Anuj Mehra Mar 15 '19 at 13:22
  • 1
    If you change the row `("monica", 43, "candy")` to `("monica", 43, "zebra")`, this example does _not_ work. @AnujMehra - The code interprets for me in the REPL. – codeaperature Jun 04 '19 at 23:44
2

What your trying to achieve is

  1. group rows by age
  2. reduce each group to 1 row with maximum age

This alternative achieves this output without use of aggregate

import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._


object TestJob5 {

  def main (args: Array[String]): Unit = {

    val sparkSession = SparkSession
      .builder()
      .appName(this.getClass.getName.replace("$", ""))
      .master("local")
      .getOrCreate()

    val sc = sparkSession.sparkContext
    sc.setLogLevel("ERROR")

    import sparkSession.sqlContext.implicits._

    val rawDf = Seq(
      ("Moe",  "Slap",  7.9, 118),
      ("Larry",  "Spank",  8.0, 115),
      ("Curly",  "Twist", 6.0, 113),
      ("Laurel", "Whimper", 7.53, 119),
      ("Hardy", "Laugh", 6.0, 118),
      ("Charley",  "Ignore",   9.7, 115),
      ("Moe",  "Spank",  6.8, 118),
      ("Larry",  "Twist", 6.0, 115),
      ("Charley",  "fall", 9.0, 115)
    ).toDF("name", "requisite", "funniness_of_requisite", "age")

    rawDf.show(false)
    rawDf.printSchema

    val nameWindow = Window
      .partitionBy("name")

    val aggDf = rawDf
      .withColumn("id", monotonically_increasing_id)
      .withColumn("maxFun", max("funniness_of_requisite").over(nameWindow))
      .withColumn("count", count("name").over(nameWindow))
      .withColumn("minId", min("id").over(nameWindow))
      .where(col("maxFun") === col("funniness_of_requisite") && col("minId") === col("id") )
      .drop("maxFun")
      .drop("minId")
      .drop("id")

    aggDf.printSchema

    aggDf.show(false)
  }

}

bear in mind that a group could potentially have more than 1 row with max age so you need to pick one by some logic. In the example I assume it doesn't matter so I just assign a unique number to choose

Rubber Duck
  • 3,673
  • 3
  • 40
  • 59
1

Noting that a subsequent join is extra shuffling and some of the other solutions seem inaccurate in the returns or even turn the Dataset into Dataframes, I sought a better solution. Here is mine:

case class People(name: String, age: Int, other: String)   
val df = Seq(
  People("Rob", 20, "cherry"),
  People("Rob", 55, "banana"),
  People("Rob", 40, "apple"),
  People("Ariel", 55, "fox"),
  People("Vera", 43, "zebra"),
  People("Vera", 99, "horse")
).toDS

val oldestResults = df
 .groupByKey(_.name)
 .mapGroups{ 
    case (nameKey, peopleIter) => {
        var oldestPerson = peopleIter.next  
        while(peopleIter.hasNext) {
            val nextPerson = peopleIter.next
            if(nextPerson.age > oldestPerson.age) oldestPerson = nextPerson 
        }
        oldestPerson
    }
  }    
  oldestResults.show  

The following produces:

+-----+---+------+
| name|age| other|
+-----+---+------+
|Ariel| 55|   fox|
|  Rob| 55|banana|
| Vera| 99| horse|
+-----+---+------+
codeaperature
  • 1,089
  • 2
  • 10
  • 25
0

You need to remember that aggregate functions reduce the rows and therefore you need to specify which of the rows age you want with a reducing function. If you want to retain all rows of a group (warning! this can cause explosions or skewed partitions) you can collect them as a list. You can then use a UDF (user defined function) to reduce them by your criteria, in this example funniness_of_requisite. And then expand columns belonging to the reduced row from the single reduced row with another UDF . For the purpose of this answer I assume you wish to retain the age of the person who has the max funniness_of_requisite.

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType}

import scala.collection.mutable


object TestJob4 {

def main (args: Array[String]): Unit = {

val sparkSession = SparkSession
  .builder()
  .appName(this.getClass.getName.replace("$", ""))
  .master("local")
  .getOrCreate()

val sc = sparkSession.sparkContext

import sparkSession.sqlContext.implicits._

val rawDf = Seq(
  (1, "Moe",  "Slap",  7.9, 118),
  (2, "Larry",  "Spank",  8.0, 115),
  (3, "Curly",  "Twist", 6.0, 113),
  (4, "Laurel", "Whimper", 7.53, 119),
  (5, "Hardy", "Laugh", 6.0, 18),
  (6, "Charley",  "Ignore",   9.7, 115),
  (2, "Moe",  "Spank",  6.8, 118),
  (3, "Larry",  "Twist", 6.0, 115),
  (3, "Charley",  "fall", 9.0, 115)
).toDF("id", "name", "requisite", "funniness_of_requisite", "age")

rawDf.show(false)
rawDf.printSchema

val rawSchema = rawDf.schema

val fUdf = udf(reduceByFunniness, rawSchema)

val nameUdf = udf(extractAge, IntegerType)

val aggDf = rawDf
  .groupBy("name")
  .agg(
    count(struct("*")).as("count"),
    max(col("funniness_of_requisite")),
    collect_list(struct("*")).as("horizontal")
  )
  .withColumn("short", fUdf($"horizontal"))
  .withColumn("age", nameUdf($"short"))
  .drop("horizontal")

aggDf.printSchema

aggDf.show(false)
}

def reduceByFunniness= (x: Any) => {

val d = x.asInstanceOf[mutable.WrappedArray[GenericRowWithSchema]]

val red = d.reduce((r1, r2) => {

  val funniness1 = r1.getAs[Double]("funniness_of_requisite")
  val funniness2 = r2.getAs[Double]("funniness_of_requisite")

  val r3 = funniness1 match {
    case a if a >= funniness2 =>
      r1
    case _ =>
      r2
  }

  r3
})

red
}

def extractAge = (x: Any) => {

val d = x.asInstanceOf[GenericRowWithSchema]

d.getAs[Int]("age")
}
 }

  d.getAs[String]("name")
}
}

here is the output

+-------+-----+---------------------------+-------------------------------+---+
|name   |count|max(funniness_of_requisite)|short                          
|age|
+-------+-----+---------------------------+-------------------------------+---+
|Hardy  |1    |6.0                        |[5, Hardy, Laugh, 6.0, 18]     
|18 |
|Moe    |2    |7.9                        |[1, Moe, Slap, 7.9, 118]       
|118|
|Curly  |1    |6.0                        |[3, Curly, Twist, 6.0, 113]    
|113|
|Larry  |2    |8.0                        |[2, Larry, Spank, 8.0, 115]    
|115|
|Laurel |1    |7.53                       |[4, Laurel, Whimper, 7.53, 119]|119|
|Charley|2    |9.7                        |[6, Charley, Ignore, 9.7, 115] |115|
+-------+-----+---------------------------+-------------------------------+---+
Rubber Duck
  • 3,673
  • 3
  • 40
  • 59