1

I have a Dataframe that represents co-occurrence of words and a value that represents a probability of their orders.

If my sentence is "A B C D E", then my co-occurrence table looks like this (simplified example):

val tempDF = spark.createDataFrame(Seq(
  (0, "A", "B", "1.2"),
  (1, "B", "A", "0.2"),
  (2, "B", "C", "2.2"),
  (3, "C", "B", "4.2"),
  (4, "D", "E", "1.2"),
  (5, "E", "D", "5.2")
)).toDF("id", "keyword1", "keyword2", "val").withColumn("id", monotonically_increasing_id())

+---+---------+---------+---+
|id |keyword1 |keyword2 |val|
+---+---------+---------+---+
|0  |A        |B        |1.2|
|1  |B        |A        |0.2|
|2  |B        |C        |2.2|
|3  |C        |B        |4.2|
|4  |D        |E        |1.2|
|5  |E        |D        |5.2|
+---+---------+---------+---+

I know how to remove the duplicate pairs. In case of A B and B A, I can do as follow to keep either A B or B A when I don't care about the val column:

tempDF.where(tempDF("keyword2") < tempDF("keyword1"))
+---+---------+---------+---+
| id|keyword1 |keyword2 |val|
+---+---------+---------+---+
|  1|        B|        A|0.2|
|  3|        C|        B|4.2|
|  5|        E|        D|5.2|
+---+---------+---------+---+

The question is, what is the way (efficient if possible) to keep the pair that has the greater value of its reverse? This is my ideal result:

+---+---------+---------+---+
|id |keyword1 |keyword2 |val|
+---+---------+---------+---+
|0  |A        |B        |1.2|
|3  |C        |B        |4.2|
|5  |E        |D        |5.2|
+---+---------+---------+---+

Real example:

val wordDataFrame = spark.createDataFrame(Seq(
  (0, Array("Hi", "I", "heard", "about", "apache", "spark"))
)).toDF("id", "words").withColumn("id", monotonically_increasing_id())

The result after calculating co-occurrence (since it is only one sentence, the numbers are not making any sense. They are different in reality):

+--------+--------+-----+-------+---+
|Keyword1|Keyword2|coOcc|occKey |val|
+--------+--------+-----+-------+---+
|   about|      hi|    1|      1|1.0|
|   about|  apache|    1|      1|1.0|
|   about|   heard|    1|      1|1.0|
|   about|   spark|    1|      1|1.0|
|   about|       i|    1|      1|1.0|
|  apache|       i|    1|      1|1.0|
|  apache|   spark|    1|      1|2.0|
|  apache|      hi|    1|      1|1.0|
|  apache|   about|    1|      1|1.0|
|  apache|   heard|    1|      1|1.0|
|   heard|   spark|    1|      1|1.0|
|   heard|       i|    1|      1|1.0|
|   heard|  apache|    1|      1|1.0|
|   heard|      hi|    1|      1|1.0|
|   heard|   about|    1|      1|1.0|
|      hi|   spark|    1|      1|1.0|
|      hi|       i|    1|      1|2.0|
|      hi|   about|    1|      1|1.0|
|      hi|   heard|    1|      1|1.0|
|      hi|  apache|    1|      1|1.0|
|       i|      hi|    1|      1|1.0|
|       i|  apache|    1|      1|1.0|
|       i|   about|    1|      1|1.0|
|       i|   heard|    1|      1|1.0|
|       i|   spark|    1|      1|1.0|
|   spark|  apache|    1|      1|0.5|
|   spark|       i|    1|      1|1.0|
|   spark|      hi|    1|      1|1.0|
|   spark|   about|    1|      1|1.0|
|   spark|   heard|    1|      1|1.0|
+--------+--------+-----+-------+---+

I need to remove the duplicate pairs by keeping the ones with greater values: (spark, apache, 0.5)(apache, spark, 2.0) -> keep the (apache, spark, 2.0) and remove the other.

UPDATE: As it can be seen, there is always a row with (keyword1, keyword2, value) and a (keyword2, keyword1, value) in another row. Every row is unique because, the table is generated by a groupBy. The answers in the following question didn't help me since the table in that question is always (key1, key2, value). How to select the first row of each group?

I need to know which row should be reversed, convert my (B, A, 0.2) to (A, B, 0.2), remove it as a duplicate, then apply window partition if I choose to use answers in that questions. (even, in this case, I lose the direction if I need to use the result in a directional Graph)

Maziyar
  • 1,913
  • 2
  • 18
  • 37
  • I have done tons of searches and tests. I saw the question you are referring to as well. He has (key1, key2) for the entire table. I have (key1, key2) in one row and always (key2, key1) the next row. I couldn't operate the way that gets both the max of these two and remove the duplicate. Unfortunately, I have tried all the answers in that question :( – Maziyar Dec 02 '18 at 19:17
  • I update my question to explain more why window/partition didn't work for me. – Maziyar Dec 02 '18 at 19:28

1 Answers1

1

if you don't care about the direction:

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.functions._

val t2=tempDF.withColumn("key_1",when($"keyword1"<$"keyword2", $"keyword1").otherwise($"keyword2")).withColumn("key_2",when($"keyword1"<$"keyword2", $"keyword2").otherwise($"keyword1")).withColumn("val",$"val".cast(DoubleType))

t2.groupBy($"key_1",$"key_2").max("val").show()
+-----+-----+--------+
|key_1|key_2|max(val)|
+-----+-----+--------+
|    B|    C|     4.2|
|    D|    E|     5.2|
|    A|    B|     1.2|
+-----+-----+--------+

if you do care about the direction also

val t3=t2.groupBy($"key_1",$"key_2").agg(first($"val").as("fval"),max($"val").as("max"))
 t3.withColumn("k1",when($"fval"<$"max",$"key_2").otherwise($"key_1")).withColumn("k2",when($"fval"<$"max",$"key_1").otherwise($"key_2")).select($"k1",$"k2",$"max").show()
+---+---+---+
| k1| k2|max|
+---+---+---+
|  C|  B|4.2|
|  E|  D|5.2|
|  A|  B|1.2|
+---+---+---+
Arnon Rotem-Gal-Oz
  • 25,469
  • 3
  • 45
  • 68
  • I've spent 2 days and tried almost everything on the internet. Still, I didn't even come close to your answer! You nailed it! Many thanks Arnon! – Maziyar Dec 03 '18 at 10:35