0

I'm looking for efficient way of applying some map function to each pair of elements in a dataframe. e.g.

records = spark.createDataFrame(
    [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')], \
    ['id', 'val'])
records.show()

+---+---+
| id|val|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
|  4|  d|
+---+---+

I want to take values a, b, c, d and compare each of them with all the rest:

a -> b
a -> c
a -> d
b -> c
b -> d
c -> d

By comparison I mean custom function that takes those 2 values and calculates some similarity index between them. Could you suggest efficient way to perform this calculation, assuming input dataframe could contain tenth millions elements?

Spark version 2.4.6 (AWS emr-5.31.0), using EMR notebook with pyspark

Max Chernopolsky
  • 617
  • 6
  • 17

2 Answers2

0

Collect val column values into lookup column. then compare each value from lookup array with val column.

Check below code.

>>> records\ 
.select(F.collect_list(F.struct(F.col("id"),F.col("val"))).alias("data"),F.collect_list(F.col("val")).alias("lookup"))\ 
.withColumn("data",F.explode(F.col("data"))) \
.select("data.*",F.expr("filter(lookup,v -> v != data.val)").alias("lookup")) \
#.withColumn("compare",expr("transform(lookup, v -> val [.....] )")) # May be you can add your logic in this -> [.....] 
.show()

+---+---+---------+
| id|val|   lookup|
+---+---+---------+
|  1|  a|[b, c, d]|
|  2|  b|[a, c, d]|
|  3|  c|[a, b, d]|
|  4|  d|[a, b, c]|
+---+---+---------+
Srinivas
  • 8,957
  • 2
  • 12
  • 26
0

This is a cross join operation with a collect_list aggregation. if you want a's matches list to contain only [b,c,d] you should apply that filter before doing the collect_list.

records.alias("lhs")
.crossJoin(episodes.alias("rhs"))
.filter("lhs.val!=rhs.val")
.groupBy("lhs")
.agg(functions.collect_list("rhs.val").alias("lookup"))
.selectExpr("lhs.*", "lookup");
DaveEdelstein
  • 1,256
  • 8
  • 14