3

How can I use the SPARK DataFrame API to group by id, compute all value combinations within a group, and produce a single output dataframe?

Example:

val testSchema = StructType(Array(
  StructField("id", IntegerType),
  StructField("value", StringType)))

val test_rows = Seq(
    Row(1, "a"),
    Row(1, "b"),
    Row(1, "c"),
    Row(2, "a"),
    Row(2, "d"),
    Row(2, "e")
)
val test_rdd = sc.parallelize(test_rows)
val test_df = sqlContext.createDataFrame(test_rdd, testSchema)

Expected output:

1 a b
1 a c
1 b c
2 a d
2 a e
2 d e

Best solution so far:

Perform self join, filter on id equality and eliminate equal values

val result = test_df.join(
    test_df.select(test_df.col("id").as("r_id"), test_df.col("value").as("r_value")),
    ($"id" === $"r_id") and ($"value" !== $"r_value")).select("id", "value", "r_value")


+---+-----+-------+
| id|value|r_value|
+---+-----+-------+
|  1|    a|      b|
|  1|    a|      c|
|  1|    b|      a|
|  1|    b|      c|
|  1|    c|      a|
|  1|    c|      b|
|  2|    a|      d|
|  2|    a|      e|
|  2|    d|      a|
|  2|    d|      e|
|  2|    e|      a|
|  2|    e|      d|
+---+-----+-------+

Remaining problem: how to eliminate duplicate sets, e.g., (a,b) and (b,a) while performing a join?

Alberto Bonsanto
  • 17,556
  • 10
  • 64
  • 93
behas
  • 3,386
  • 5
  • 27
  • 27
  • 1
    In this case It's better to use an `RDD` instead of a `DataFame`, read this [Spark DataFrame Aggregation Function](http://stackoverflow.com/questions/33899977/spark-dataframe-custom-aggregation-function-to-sum-a-column-of-vectors) and you'll notice why. – Alberto Bonsanto Apr 09 '16 at 14:12

1 Answers1

1

Do you have an ordering on the objects in the value field? If so, it seems like you could just join the dataframe with itself, while requiring that the ids be identical and value from the left table be less than the value from the right table.

[edit]If you don't have an ordering, and you have sufficiently few values per id, another solution is to use groupByKey and then create all combinations from the resulting sequence, which can be done more simply than creating all pairs and then only keeping half. (If you're using Scala, for example, I believe Seq's combination function [doc] will do what you need it to.) This will perform much worse than the self-join approach for most datasets.

Matthew Gray
  • 1,668
  • 15
  • 17
  • Unfortunately, values are non-numeric...so I have to eliminate set duplicates in a postprocessing step, I guess. – behas Apr 07 '16 at 17:13
  • @behas: They don't need to be numeric to have an ordering. If they're strings, for example, you have a lexicographical ordering that you can use for comparisons. `"a"<"b"` will resolve to `true` and `"b"<"a"` will resolve to `false`. If they're objects with more complicated structure, comparing unique ids or string labels will also work. – Matthew Gray Apr 07 '16 at 21:10
  • no lexicographical ordering either...values are hashes – behas Apr 10 '16 at 07:59
  • @behas Is there a reason you can't compare the hash values? (You'll eliminate pairings where there's a collision, but you'd do that anyway with `groupByKey` and `combination`.) – Matthew Gray Apr 11 '16 at 17:01