3

I would like to use GroupBy operator on a DataFrame with my own equality comparators.

Let's assume that I want to execute something like:

df.groupBy("Year","Month").sum("Counter")

In this DataFrame:

Year    | Month      | Counter  
---------------------------
2012    | Jan        | 100          
12      | January    | 200       
12      | Janu       | 300       
2012    | Feb        | 400       
13      | Febr       | 500

I have to implement two comparators:

1) For column Year: p.e. "2012" == "12"

2) For column Month: p.e. "Jan" == "January" == "Janu"

Let's assume that I already implemented these two comparators. How can I invoke them? As in this example, I already know that I have to convert my DataFrame into an RDD to make possible to use my comparators.

I thought about using RDD GroupBy.

Note that I really need to do this using comparators. I can't use UDFs, change the data or create new columns. The future idea is to have ciphertext columns, in which I have functions that allow me to compare if two ciphertexts are the same. I want to use them in my comparators.

Edit:

In this moment, I am trying to do this with only one column, like:

df.groupBy("Year").sum("Counter")

I have a Wrapper class:

class ExampleWrapperYear (val year: Any) extends Serializable {
      // override hashCode and Equals methods
}

Then, I am doing this:

val rdd = df.rdd.keyBy(a => new ExampleWrapperYear(a(0))).groupByKey()

My question here is how to do the "sum", and how to use keyBy with multiple columns to use ExampleWrapperYear and ExampleWrapperMonth.

proxyfss
  • 113
  • 1
  • 8
  • why are do you have to use comparators?? – Raphael Roth Mar 13 '19 at 19:25
  • My goal is to understand if I can do this with comparators. As I said in the question, my goal in the future is to work with ciphertext, where I can not use UDFs to decipher the data on the server side. Imagine that I have an "Age" column with 3 entries: 20, 25, 20. After encrypting the data with a non-deterministic cipher, I get a column with XXX, YYY, ZZZ (always different values). I have a function that tells me that XXX and ZZZ are the same (just an example). So I want to call a comparator that invokes my function that lets me know that they are the same. – proxyfss Mar 13 '19 at 19:41
  • So that I understand correctly, the comparator tells you that they are equal but you are unable to map the ciphered values to a space in which they are equal? – Oli Mar 14 '19 at 09:36
  • Yes, that's my challenge. I found this [example](https://stackoverflow.com/questions/30785615/reducebykey-with-a-byte-array-as-the-key?fbclid=IwAR2fA9hYv3L2CY3pZkDf3oYLrtrzgg9r14P_MG2vmxiSK2JvHkPw6Q9y6y8) which probably can be very similar for my propose. But I don't understand how to apply it in my situation. For example, in the query that I presented above. – proxyfss Mar 14 '19 at 11:18
  • To make it simple, if you only have a function that tells you whether or not keys are equal, the only solution you have is a cross product and to make every pair comparisons. The only way to do better than that, is for you to tell us more about your specific use case so that we can find a trick to reduce the complexity. – Oli Mar 14 '19 at 12:43
  • I updated the question to better understand what I want to do – proxyfss Mar 14 '19 at 13:42

2 Answers2

1

You can use udfs to implement the logic to make it standard year/month format

  def toYear : (Integer) => Integer = (year:Integer)=>{
    2000 + year % 100 //assuming all years in 2000-2999 range
  }

  def toMonth : (String) => String = (month:String)=>{
    month match {
      case "January"=> "Jan"
      case "Janu"=> "Jan"
      case "February" => "Feb"
      case "Febr" => "Feb"
      case _ => month
    }
  }

  val toYearUdf = udf(toYear)
  val toMonthUdf = udf(toMonth)

  df.groupBy( toYearUdf(col("Year")), toMonthUdf(col("Month"))).sum("Counter").show()
Ranga Vure
  • 1,922
  • 3
  • 16
  • 23
  • Probably you did not notice in my last paragraph. I really need to use Comparators to do this operation. The future idea is to have ciphertext columns, in which I have functions that allow me to compare if two ciphertexts are the same. I updated the last paragraph to other users understand that why I can't use UDFs. – proxyfss Mar 13 '19 at 18:37
1

This solution should work.Here are the case classes (we can call these as comparators) which implements hashCode and equals.

You can modify/update hashCode and equals based on different ciphertexts

  case class Year(var year:Int){

    override def hashCode(): Int = {
      this.year = this.year match {
        case 2012 => 2012
        case 12 => 2012
        case 13 => 2013
        case _ => this.year
      }
      this.year.hashCode()
    }

    override def equals(that: Any): Boolean ={
      val year1 = 2000 + that.asInstanceOf[Year].year % 100
      val year2 = 2000 + this.year % 100
      if (year1 == year2)
        true
      else
        false
    }
  }

  case class Month(var month:String){

    override def hashCode(): Int = {
      this.month = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      this.month.hashCode
    }

    override def equals(that: Any): Boolean ={
      val month1 = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      val month2 = that.asInstanceOf[Month].month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => that.asInstanceOf[Month].month
      }
      if (month1.equals(month2))
        true
      else
        false
    }
  }

Here is the important comparator for the grouped keys, which simply uses the individual col comparator

  case class Key(var year:Year, var month:Month){

    override def hashCode(): Int ={
      this.year.hashCode() + this.month.hashCode()
    }

    override def equals(that: Any): Boolean ={
      if ( this.year.equals(that.asInstanceOf[Key].year) && this.month.equals(that.asInstanceOf[Key].month))
        true
      else
        false
    }
  }

  case class Record(year:Int,month:String,counter:Int)

  val df = spark.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("data.csv").as[Record]

  df.rdd.groupBy[Key](
      (record:Record)=>Key(Year(record.year), Month(record.month)))
      .map(x=> Record(x._1.year.year, x._1.month.month, x._2.toList.map(_.counter).sum))
      .toDS().show()

which gives

+----+-----+-------+
|year|month|counter|
+----+-----+-------+
|2012|  Feb|    800|
|2013|  Feb|    500|
|2012|  Jan|    700|
+----+-----+-------+

for this input in data.csv

Year,Month,Counter
2012,February,400
2012,Jan,100
12,January,200
12,Janu,300
2012,Feb,400
13,Febr,500
2012,Jan,100

Please note that, for the case classes Year and Month, also updated the value to standard value (otherwise it is unpredictable which value it picks).

Ranga Vure
  • 1,922
  • 3
  • 16
  • 23