0

I am using spark-sql-2.4.1v how to do various joins depend on the value of column I need get multiple look up values of map_val column for given value columns as show below.

Sample data:

val data = List(
  ("20", "score", "school", "2018-03-31", 14 , 12),
  ("21", "score", "school", "2018-03-31", 13 , 13),
  ("22", "rate", "school", "2018-03-31", 11 , 14),
  ("21", "rate", "school", "2018-03-31", 13 , 12)
 )
val df = data.toDF("id", "code", "entity", "date", "value1", "value2")

df.show

+---+-----+------+----------+------+------+
| id| code|entity|      date|value1|value2|
+---+-----+------+----------+------+------+
| 20|score|school|2018-03-31|    14|    12|
| 21|score|school|2018-03-31|    13|    13|
| 22| rate|school|2018-03-31|    11|    14|
| 21| rate|school|2018-03-31|    13|    12|
+---+-----+------+----------+------+------+

Lookup dataset rateDs:

val rateDs = List(
  ("21","2018-01-31","2018-06-31", 12 ,"C"),
  ("21","2018-01-31","2018-06-31", 13 ,"D")
).toDF("id","start_date","end_date", "map_code","map_val")

rateDs.show

+---+----------+----------+--------+-------+
| id|start_date|  end_date|map_code|map_val|
+---+----------+----------+--------+-------+
| 21|2018-01-31|2018-06-31|      12|      C|
| 21|2018-01-31|2018-06-31|      13|      D|
+---+----------+----------+--------+-------+

Joining with lookup table for map_val column based on start_date and end_date:

 val  resultDs = df.filter(col("code").equalTo(lit("rate"))).join(rateDs , 
            (
                   df.col("date").between(rateDs.col("start_date"), rateDs.col("end_date"))
                   .and(rateDs.col("id").equalTo(df.col("id"))) 
                   //.and(rateDs.col("mapping_value").equalTo(df.col("mean"))) 
            )
            , "left"
            )
            //.drop("start_date")
            //.drop("end_date")



resultDs.show



+---+----+------+----------+------+------+----+----------+----------+--------+-------+
| id|code|entity|      date|value1|value2|  id|start_date|  end_date|map_code|map_val|
+---+----+------+----------+------+------+----+----------+----------+--------+-------+
| 21|rate|school|2018-03-31|    13|    12|  21|2018-01-31|2018-06-31|      13|      D|
| 21|rate|school|2018-03-31|    13|    12|  21|2018-01-31|2018-06-31|      12|      C|
+---+----+------+----------+------+------+----+----------+----------+--------+-------+

The expected output should be:

+---+----+------+----------+------+------+----+----------+----------+--------+-------+
| id|code|entity|      date|value1|value2|  id|start_date|  end_date|map_code|map_val|
+---+----+------+----------+------+------+----+----------+----------+--------+-------+
| 21|rate|school|2018-03-31|    D |    C |  21|2018-01-31|2018-06-31|      13|      D|
| 21|rate|school|2018-03-31|    D |    C |  21|2018-01-31|2018-06-31|      12|      C|
+---+----+------+----------+------+------+----+----------+----------+--------+-------+

Please let me know if any more details are needed.

BdEngineer
  • 2,929
  • 4
  • 49
  • 85
  • 1
    Not sure I understand the actual problem – thebluephantom Jul 28 '20 at 15:45
  • 2
    why output contains 4 rows for code=rate and not 2? – Som Jul 28 '20 at 15:49
  • 1
    If you are only interested in the rows with `code == rate` then you can filter out these rows before the `join`. – Shaido Jul 29 '20 at 01:14
  • 1
    Join is working as expected. If you need output as expected, add more conditions in your filter.. as @Shaido-ReinstateMonica stated, putting a condition on code column will solve your problem – Suhas NM Jul 29 '20 at 02:10
  • @Shaido - Reinstate Monica I corrected the question , please check once , whole question is how to look up columns i.e. "value1" ,"value2" columns, which contains "map_code" , i need to replace with respective "map_val" column.. – BdEngineer Jul 29 '20 at 05:06
  • @SomeshwarKale i corrected join now , please check edited question once , whole question is how to look up columns i.e. "value1" ,"value2" columns, which contains "map_code" , i need to replace with respective "map_val" column.. – BdEngineer Jul 29 '20 at 05:10
  • @SuhasNM i corrected join now , please check edited question once , whole question is how to look up columns i.e. "value1" ,"value2" columns, which contains "map_code" , i need to replace with respective "map_val" column.. – BdEngineer Jul 29 '20 at 05:11
  • @BdEngineer, The way you are achieving the expected result is correct. What's the problem here? To improve the performance, you may wanted to add a hint to broadcast the `rateDs` as functions.broadcast(rateDs) in the join itself – Som Jul 29 '20 at 06:12
  • @SomeshwarKale , plz tell me know how to map those values , what i am getting resultDs.show ... how to look up value1 and value2 columns ?? – BdEngineer Jul 29 '20 at 06:35
  • @Someshwar Kale my issue is how to do look up here ...i am stuck here – BdEngineer Jul 29 '20 at 06:36

1 Answers1

1

Try this-

Create lookup map before join per id and use the same to replace

 val newRateDS = rateDs.withColumn("lookUpMap",
      map_from_entries(collect_list(struct(col("map_code"), col("map_val"))).over(Window.partitionBy("id")))
    )

    newRateDS.show(false)
    /**
      * +---+----------+----------+--------+-------+------------------+
      * |id |start_date|end_date  |map_code|map_val|lookUpMap         |
      * +---+----------+----------+--------+-------+------------------+
      * |21 |2018-01-31|2018-06-31|12      |C      |[12 -> C, 13 -> D]|
      * |21 |2018-01-31|2018-06-31|13      |D      |[12 -> C, 13 -> D]|
      * +---+----------+----------+--------+-------+------------------+
      */

    val  resultDs = df.filter(col("code").equalTo(lit("rate"))).join(broadcast(newRateDS) ,
      rateDs("id") === df("id") && df("date").between(rateDs("start_date"), rateDs("end_date"))
        //.and(rateDs.col("mapping_value").equalTo(df.col("mean")))
      , "left"
    )

    resultDs.withColumn("value1", expr("coalesce(lookUpMap[value1], value1)"))
      .withColumn("value2", expr("coalesce(lookUpMap[value2], value2)"))
      .show(false)

    /**
      * +---+----+------+----------+------+------+----+----------+----------+--------+-------+------------------+
      * |id |code|entity|date      |value1|value2|id  |start_date|end_date  |map_code|map_val|lookUpMap         |
      * +---+----+------+----------+------+------+----+----------+----------+--------+-------+------------------+
      * |22 |rate|school|2018-03-31|11    |14    |null|null      |null      |null    |null   |null              |
      * |21 |rate|school|2018-03-31|D     |C     |21  |2018-01-31|2018-06-31|13      |D      |[12 -> C, 13 -> D]|
      * |21 |rate|school|2018-03-31|D     |C     |21  |2018-01-31|2018-06-31|12      |C      |[12 -> C, 13 -> D]|
      * +---+----+------+----------+------+------+----+----------+----------+--------+-------+------------------+
      */
Som
  • 6,193
  • 1
  • 11
  • 22
  • thanks a lot , let me check it , what does coalesce do here ? – BdEngineer Jul 29 '20 at 08:00
  • 1
    it won't replace if there is no lookup – Som Jul 29 '20 at 08:01
  • oh thank you , if no look up , want to keep it as null then ? – BdEngineer Jul 29 '20 at 09:42
  • why this .over(Window.partitionBy("id"))) ...over "id" ?? – BdEngineer Jul 29 '20 at 09:43
  • , can we rebroadcast the newRateDS because this is depend on condition df("date").between( ....) ...which changes quite often ..so what is the best way to handle it in this case? – BdEngineer Jul 29 '20 at 10:31
  • without .over(Window.partitionBy("id"))) this it is not working.. how to construct a map_from_entries from col("map_code"), col("map_val") ? – BdEngineer Jul 29 '20 at 10:58
  • 2
    too many queries to answer :( I think, this has solved whatever asked in the description. **How to do lookup**. Please use this as reference. you may wanted to change few stuffs like removing `coalesce` if you need `null` when no lookup value available – Som Jul 29 '20 at 11:38
  • thanks a lot , really sorry , when i removed .over(Window.partitionBy("id"))) its giving error...as it is expecting some agg col – BdEngineer Jul 29 '20 at 14:19
  • what is wrong i am doing here ? can you have a look plz https://stackoverflow.com/questions/63224148/otherwise-clause-not-working-as-expect-whats-wrong-here – BdEngineer Aug 03 '20 at 05:25