2

I'm trying to solve this kind of problem with Spark 2, but I can't find a solution.

I have a dataframe A :

+----+-------+------+
|id  |COUNTRY| MONTH|
+----+-------+------+
|  1 |    US |    1 |
|  2 |    FR |    1 |
|  4 |    DE |    1 |
|  5 |    DE |    2 |
|  3 |    DE |    3 |
+----+-------+------+

And a dataframe B :

+-------+------+------+
|COLUMN |VALUE | PRIO |
+-------+------+------+
|COUNTRY|   US |    5 |
|COUNTRY|   FR |   15 |
|MONTH  |   3  |    2 |
+-------+------+------+

The idea is to apply "rules" of dataframe B on dataframe A in order to get this result :

dataframe A' :

+----+-------+------+------+
|id  |COUNTRY| MONTH| PRIO |
+----+-------+------+------+
|  1 |    US |    1 |    5 |
|  2 |    FR |    1 |   15 |
|  4 |    DE |    1 |   20 |
|  5 |    DE |    2 |   20 |
|  3 |    DE |    3 |    2 |
+----+-------+------+------+

I tried someting like that :

dfB.collect.foreach( r =>
    var dfAp = dfA.where(r.getAs("COLUMN") == r.getAs("VALUE"))
    dfAp.withColumn("PRIO", lit(r.getAs("PRIO")))
)

But I'm sure it's not the right way.

What are the strategy to solve this problem in Spark ?

zero323
  • 322,348
  • 103
  • 959
  • 935
Tony Rx
  • 53
  • 5

2 Answers2

2

Working under assumption that the set of rules is reasonably small (possible concerns are the size of the data and the size of generated expression, which in the worst case scenario, can crash the planner) the simplest solution is to use local collection and map it to a SQL expression:

import org.apache.spark.sql.functions.{coalesce, col, lit, when}

val df = Seq(
  (1, "US", "1"), (2, "FR", "1"), (4, "DE", "1"),
  (5, "DE", "2"), (3, "DE", "3")
).toDF("id", "COUNTRY", "MONTH")

val rules = Seq(
  ("COUNTRY", "US", 5), ("COUNTRY", "FR", 15), ("MONTH", "3", 2)
).toDF("COLUMN", "VALUE", "PRIO")


val prio = coalesce(rules.as[(String, String, Int)].collect.map {
  case (c, v, p) => when(col(c) === v, p)
} :+ lit(20): _*)

df.withColumn("PRIO", prio)
+---+-------+-----+----+
| id|COUNTRY|MONTH|PRIO|
+---+-------+-----+----+
|  1|     US|    1|   5|
|  2|     FR|    1|  15|
|  4|     DE|    1|  20|
|  5|     DE|    2|  20|
|  3|     DE|    3|   2|
+---+-------+-----+----+

You can replace coalesce with least or greatest to apply the smallest or the largest matching value respectively.

With larger set of rules you could:

  • melt data to convert to a long format.

    val dfLong = df.melt(Seq("id"), df.columns.tail, "COLUMN", "VALUE")
    
  • join by column and value.

  • Aggregate PRIOR by id with appropriate aggregation function (for example min):

    val priorities = dfLong.join(rules, Seq("COLUMN", "VALUE"))
      .groupBy("id")
      .agg(min("PRIO").alias("PRIO"))
    
  • Outer join the output with df by id.

    df.join(priorities, Seq("id"), "leftouter").na.fill(20)
    
    +---+-------+-----+----+   
    | id|COUNTRY|MONTH|PRIO|
    +---+-------+-----+----+
    |  1|     US|    1|   5|
    |  2|     FR|    1|  15|
    |  4|     DE|    1|  20|
    |  5|     DE|    2|  20|
    |  3|     DE|    3|   2|
    +---+-------+-----+----+
    
zero323
  • 322,348
  • 103
  • 959
  • 935
0

lets assume rules of dataframeB is limited

I have created dataframe "df" for below table

+---+-------+------+
| id|COUNTRY|MONTH|
+---+-------+------+
|  1|     US|     1|
|  2|     FR|     1|
|  4|     DE|     1|
|  5|     DE|     2|
|  3|     DE|     3|
+---+-------+------+

By using UDF

val code = udf{(x:String,y:Int)=>if(x=="US") "5" else if (x=="FR") "15" else if (y==3) "2"  else "20"}

df.withColumn("PRIO",code($"COUNTRY",$"MONTH")).show()

output

+---+-------+------+----+
| id|COUNTRY|MONTH|PRIO|
+---+-------+------+----+
|  1|     US|     1|   5|
|  2|     FR|     1|  15|
|  4|     DE|     1|  20|
|  5|     DE|     2|  20|
|  3|     DE|     3|   2|
+---+-------+------+----+
LUZO
  • 1,019
  • 4
  • 19
  • 42