2

I am processing the following tables and I would like to compute a new column (outcome) based on the distinct value of 2 other columns.

| id1  | id2 | outcome
|  1   |  1  |  1
|  1   |  1  |  1
|  1   |  3  |  2
|  2   |  5  |  1 
|  3   |  1  |  1  
|  3   |  2  |  2
|  3   |  3  |  3

The outcome should begin in incremental order starting from 1 based on the combined value of id1 and id2. Any hints how this can be accomplished in Scala. row_number doesn't seem to be useful here in this case.

The logic here is that for each unique value of id1 we will start numbering the outcome with min(id2) for corresponding id1 being assigned a value of 1.

Junaid
  • 171
  • 8
  • Can you explain the logic(pattern) to compute outcome column? 1,3 -> 2 then 2, 5 is 1 – Belwal May 08 '20 at 11:49
  • @Belwal The logic here in this case is assigning integer values based on the sorted order of distinct combinations. Outcome is assigned based for each id1 value considering id2 should be unique. If you see for id1 -> 1, id2 -> 1, outcome will always be 1 no matter how many times the same combination gets repeated. For `id` = 2, there is only one value existing corresponding to `id2`. The min(`id2`) is assigned as 1. – Junaid May 08 '20 at 11:59

2 Answers2

1

You could try dense_rank()

with your example

      val df = sqlContext
        .read
        .option("sep","|")
        .option("header", true)
        .option("inferSchema",true)
        .csv("/home/cloudera/files/tests/ids.csv") // Here we read the .csv files
        .cache()

      df.show()
      df.printSchema()

      df.createOrReplaceTempView("table")
      sqlContext.sql(
        """
          |SELECT id1, id2, DENSE_RANK() OVER(PARTITION BY id1 ORDER BY id2) AS outcome
          |FROM table
          |""".stripMargin).show()

output

+---+---+-------+
|id1|id2|outcome|
+---+---+-------+
|  2|  5|      1|
|  1|  1|      1|
|  1|  1|      1|
|  1|  3|      2|
|  3|  1|      1|
|  3|  2|      2|
|  3|  3|      3|
+---+---+-------+
Chema
  • 2,748
  • 2
  • 13
  • 24
  • 1
    With regards to performance how does the dense_rank() perform when it comes to larger set of data. – Junaid May 08 '20 at 12:37
  • 2
    Main cost here is the data shuffle and ordering due to windowing. A useful link: https://stackoverflow.com/questions/54332942/spark-windowing-function-vs-group-by-performance-issue – Belwal May 08 '20 at 12:49
1

Use Window function to club(partition) them by first id and then order each partition based on second id.

Now you just need to assign a rank (dense_rank) over each Window partition.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

df
.withColumn("outcome", dense_rank().over(Window.partitionBy("id1").orderBy("id2")))

Belwal
  • 453
  • 1
  • 4
  • 15
  • I believe the partitioning will enable processing to be done over large set of data. – Junaid May 08 '20 at 12:49
  • 2
    It is because of the use case not the data size. I think below links are useful for more details: https://stackoverflow.com/questions/2404565/sql-server-difference-between-partition-by-and-group-by, https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-windows.html – Belwal May 10 '20 at 15:14