7

I have a spark dataframe (prof_student_df) that lists student/professor pair for a timestamp. There are 4 professors and 4 students for each timestamp and each professor-student pair has a “score” (so there are 16 rows per time frame). For each time frame, I need to find the one to one pairing between professors/students that maximizes the overall score. Each professor can only be matched with one student for a single time frame.

For example, here are the pairings/scores for one time frame.

+------------+--------------+------------+-------+----------+
|    time    | professor_id | student_id | score | is_match |
+------------+--------------+------------+-------+----------+
| 1596048041 | p1           | s1         |   0.7 | FALSE    |
| 1596048041 | p1           | s2         |   0.5 | TRUE     |
| 1596048041 | p1           | s3         |   0.3 | FALSE    |
| 1596048041 | p1           | s4         |   0.2 | FALSE    |
| 1596048041 | p2           | s1         |   0.9 | TRUE     |
| 1596048041 | p2           | s2         |   0.1 | FALSE    |
| 1596048041 | p2           | s3         |  0.15 | FALSE    |
| 1596048041 | p2           | s4         |   0.2 | FALSE    |
| 1596048041 | p3           | s1         |   0.2 | FALSE    |
| 1596048041 | p3           | s2         |   0.3 | FALSE    |
| 1596048041 | p3           | s3         |   0.4 | FALSE    |
| 1596048041 | p3           | s4         |   0.8 | TRUE     |
| 1596048041 | p4           | s1         |   0.2 | FALSE    |
| 1596048041 | p4           | s2         |   0.3 | FALSE    |
| 1596048041 | p4           | s3         |  0.35 | TRUE     |
| 1596048041 | p4           | s4         |   0.4 | FALSE    |
+------------+--------------+------------+-------+----------+

The goal Is to get this is_match column. It can be a boolean or a 0/1 bit or whatever works.

In the above example, p1 matched with s2, p2 matched with s1, p3 matched with s4 and p4 matched with s3 because that is the combination that maximized the total score (yields a score of 2.55). There is one weird edge case - it is possible to have LESS than 4 professors or students for a given time frame. If there are 4 professors and 3 students then 1 professor would be without a pairing and all of his is_match would be false. Similarly, if there are 3 professors and 4 students, 1 student would be without a pairing and all of his is_match would be false.

Does anyone know how I might accomplish this? i am thinking I would partition or group by time and then feed the data into some UDF that spits out the pairings and then maybe I would have to join that back to the original rows (although I am not sure). I am trying to implement this logic in pyspark and can use spark sql/sql or pyspark.

Ideally, I would like this to be as efficient as possible as there will be millions of rows. In the question, I mentioned a recursive algorithm because this is a traditional recursive type problem, but if there is a quicker solution that doesn't use recursion I am open to that.

many thanks, I am new to spark and a little stumped with how to do this.

EDIT: clarifying the question as I realize in my example I did not specify this for a single day, there will be up to 14 professors and 14 students to choose from. I am just looking at one day at a time which is why I didnt have the date in the dataframe. at any one time frame, there is at most 4 professors and 4 students. this dataframe just shows one time frame. but for the next time frame it is possible that the 4 professors are p5, p1, p7, p9 or something like that. the students might still be s1, s2, s3, s4.

Lauren Leder
  • 276
  • 1
  • 3
  • 15
  • 1
    i only see two ways of going about this,1) combination of window functions with array/higher order functions (spark2.4+). 2) pandas udaf (spark2.3+). ur logic requires communication between the rows in the time frame( in order to ensure max score outcome and to only use distinct student_ids in one timeframe) and either way will be compute intensive. i think using array/higher order functions will get too complicated and your most likely better off with a pandas grouped map udaf. my 2 cents – murtihash Jul 30 '20 at 03:00
  • Is the number of different combinations fixed to 16? – cronoik Jul 30 '20 at 15:26
  • @murtihash do you have any advice on how to do this with a pandas grouped map udaf? – Lauren Leder Jul 30 '20 at 20:14
  • @cronoik - there will be at most 4 students and 4 professors per row and for each row we calculate a value for a professor student pair. there could be less than 16 combinations if a professor/student is missing, but there will never be more. – Lauren Leder Jul 30 '20 at 20:15
  • 5
    You need to implement something like the [hungarian algorithm](https://en.wikipedia.org/wiki/Hungarian_algorithm). – cronoik Jul 31 '20 at 07:03
  • Yeah you could implement it using numpy inside pandas grouped map, n @cronoik is def onto something – murtihash Jul 31 '20 at 07:27
  • @cronoik I am new to this and have not used a pandas grouped map, do you have an example of how I would start this. sorry if I sound dumb, don't want to waste your time and dont expect you to do the work for me.. I can definitely get it going once I understand how it would work – Lauren Leder Jul 31 '20 at 20:37
  • @murtihash if you have any ideas that would be super helpful too, can only tag one user per comment. – Lauren Leder Jul 31 '20 at 20:37

2 Answers2

5

Edit: As discussed in comments, to fix the issue mentioned in your update, we can convert student_id at each time into generalized sequence-id using dense_rank, go through Step 1 to 3 (using student column) and then use join to convert student at each time back to their original student_id. see below Step-0 and Step-4. in case there are less than 4 professors in a timeUnit, dimension will be resize to 4 in Numpy-end (using np_vstack() and np_zeros()), see the updated function find_assigned.

You can try pandas_udf and scipy.optimize.linear_sum_assignment(note: the backend method is the Hungarian algorithm as mentioned by @cronoik in the main comments), see below:

from pyspark.sql.functions import pandas_udf, PandasUDFType, first, expr, dense_rank
from pyspark.sql.types import StructType
from scipy.optimize import linear_sum_assignment
from pyspark.sql import Window
import numpy as np

df = spark.createDataFrame([
    ('1596048041', 'p1', 's1', 0.7), ('1596048041', 'p1', 's2', 0.5), ('1596048041', 'p1', 's3', 0.3),
    ('1596048041', 'p1', 's4', 0.2), ('1596048041', 'p2', 's1', 0.9), ('1596048041', 'p2', 's2', 0.1),
    ('1596048041', 'p2', 's3', 0.15), ('1596048041', 'p2', 's4', 0.2), ('1596048041', 'p3', 's1', 0.2),
    ('1596048041', 'p3', 's2', 0.3), ('1596048041', 'p3', 's3', 0.4), ('1596048041', 'p3', 's4', 0.8),
    ('1596048041', 'p4', 's1', 0.2), ('1596048041', 'p4', 's2', 0.3), ('1596048041', 'p4', 's3', 0.35),
    ('1596048041', 'p4', 's4', 0.4)
] , ['time', 'professor_id', 'student_id', 'score'])

N = 4
cols_student = [*range(1,N+1)]

Step-0: add an extra column student, and create a new dataframe df3 with all unique combos of time + student_id + student.

w1 = Window.partitionBy('time').orderBy('student_id')

df = df.withColumn('student', dense_rank().over(w1))
+----------+------------+----------+-----+-------+                              
|      time|professor_id|student_id|score|student|
+----------+------------+----------+-----+-------+
|1596048041|          p1|        s1|  0.7|      1|
|1596048041|          p2|        s1|  0.9|      1|
|1596048041|          p3|        s1|  0.2|      1|
|1596048041|          p4|        s1|  0.2|      1|
|1596048041|          p1|        s2|  0.5|      2|
|1596048041|          p2|        s2|  0.1|      2|
|1596048041|          p3|        s2|  0.3|      2|
|1596048041|          p4|        s2|  0.3|      2|
|1596048041|          p1|        s3|  0.3|      3|
|1596048041|          p2|        s3| 0.15|      3|
|1596048041|          p3|        s3|  0.4|      3|
|1596048041|          p4|        s3| 0.35|      3|
|1596048041|          p1|        s4|  0.2|      4|
|1596048041|          p2|        s4|  0.2|      4|
|1596048041|          p3|        s4|  0.8|      4|
|1596048041|          p4|        s4|  0.4|      4|
+----------+------------+----------+-----+-------+

df3 = df.select('time','student_id','student').dropDuplicates()
+----------+----------+-------+                                                 
|      time|student_id|student|
+----------+----------+-------+
|1596048041|        s1|      1|
|1596048041|        s2|      2|
|1596048041|        s3|      3|
|1596048041|        s4|      4|
+----------+----------+-------+

Step-1: use pivot to find the matrix of professors vs students, notice we set negative of scores to the values of pivot so that we can use scipy.optimize.linear_sum_assignment to find the min cost of an assignment problem:

df1 = df.groupby('time','professor_id').pivot('student', cols_student).agg(-first('score'))
+----------+------------+----+----+-----+----+
|      time|professor_id|   1|   2|    3|   4|
+----------+------------+----+----+-----+----+
|1596048041|          p4|-0.2|-0.3|-0.35|-0.4|
|1596048041|          p2|-0.9|-0.1|-0.15|-0.2|
|1596048041|          p1|-0.7|-0.5| -0.3|-0.2|
|1596048041|          p3|-0.2|-0.3| -0.4|-0.8|
+----------+------------+----+----+-----+----+

Step-2: use pandas_udf and scipy.optimize.linear_sum_assignment to get column indices and then assign the corresponding column name to a new column assigned:

# returnSchema contains one more StringType column `assigned` than schema from the input pdf:
schema = StructType.fromJson(df1.schema.jsonValue()).add('assigned', 'string')

# since the # of students are always N, we can use np.vstack to set the N*N matrix
# below `n` is the number of professors/rows in pdf
# sz is the size of input Matrix, sz=4 in this example
def __find_assigned(pdf, sz):
  cols = pdf.columns[2:]
  n = pdf.shape[0]
  n1 = pdf.iloc[:,2:].fillna(0).values
  _, idx = linear_sum_assignment(np.vstack((n1,np.zeros((sz-n,sz)))))
  return pdf.assign(assigned=[cols[i] for i in idx][:n])

find_assigned = pandas_udf(lambda x: __find_assigned(x,N), schema, PandasUDFType.GROUPED_MAP)

df2 = df1.groupby('time').apply(find_assigned)
+----------+------------+----+----+-----+----+--------+
|      time|professor_id|   1|   2|    3|   4|assigned|
+----------+------------+----+----+-----+----+--------+
|1596048041|          p4|-0.2|-0.3|-0.35|-0.4|       3|
|1596048041|          p2|-0.9|-0.1|-0.15|-0.2|       1|
|1596048041|          p1|-0.7|-0.5| -0.3|-0.2|       2|
|1596048041|          p3|-0.2|-0.3| -0.4|-0.8|       4|
+----------+------------+----+----+-----+----+--------+

Note: per suggestion from @OluwafemiSule, we can use the parameter maximize instead of negate the score values. this parameter is available SciPy 1.4.0+:

  _, idx = linear_sum_assignment(np.vstack((n1,np.zeros((N-n,N)))), maximize=True)

Step-3: use SparkSQL stack function to normalize the above df2, negate the score values and filter rows with score is NULL. the desired is_match column should have assigned==student:

df_new = df2.selectExpr(
  'time',
  'professor_id',
  'assigned',
  'stack({},{}) as (student, score)'.format(len(cols_student), ','.join("int('{0}'), -`{0}`".format(c) for c in cols_student))
) \
.filter("score is not NULL") \
.withColumn('is_match', expr("assigned=student"))

df_new.show()
+----------+------------+--------+-------+-----+--------+
|      time|professor_id|assigned|student|score|is_match|
+----------+------------+--------+-------+-----+--------+
|1596048041|          p4|       3|      1|  0.2|   false|
|1596048041|          p4|       3|      2|  0.3|   false|
|1596048041|          p4|       3|      3| 0.35|    true|
|1596048041|          p4|       3|      4|  0.4|   false|
|1596048041|          p2|       1|      1|  0.9|    true|
|1596048041|          p2|       1|      2|  0.1|   false|
|1596048041|          p2|       1|      3| 0.15|   false|
|1596048041|          p2|       1|      4|  0.2|   false|
|1596048041|          p1|       2|      1|  0.7|   false|
|1596048041|          p1|       2|      2|  0.5|    true|
|1596048041|          p1|       2|      3|  0.3|   false|
|1596048041|          p1|       2|      4|  0.2|   false|
|1596048041|          p3|       4|      1|  0.2|   false|
|1596048041|          p3|       4|      2|  0.3|   false|
|1596048041|          p3|       4|      3|  0.4|   false|
|1596048041|          p3|       4|      4|  0.8|    true|
+----------+------------+--------+-------+-----+--------+

Step-4: use join to convert student back to student_id (use broadcast join if possible):

df_new = df_new.join(df3, on=["time", "student"])
+----------+-------+------------+--------+-----+--------+----------+            
|      time|student|professor_id|assigned|score|is_match|student_id|
+----------+-------+------------+--------+-----+--------+----------+
|1596048041|      1|          p1|       2|  0.7|   false|        s1|
|1596048041|      2|          p1|       2|  0.5|    true|        s2|
|1596048041|      3|          p1|       2|  0.3|   false|        s3|
|1596048041|      4|          p1|       2|  0.2|   false|        s4|
|1596048041|      1|          p2|       1|  0.9|    true|        s1|
|1596048041|      2|          p2|       1|  0.1|   false|        s2|
|1596048041|      3|          p2|       1| 0.15|   false|        s3|
|1596048041|      4|          p2|       1|  0.2|   false|        s4|
|1596048041|      1|          p3|       4|  0.2|   false|        s1|
|1596048041|      2|          p3|       4|  0.3|   false|        s2|
|1596048041|      3|          p3|       4|  0.4|   false|        s3|
|1596048041|      4|          p3|       4|  0.8|    true|        s4|
|1596048041|      1|          p4|       3|  0.2|   false|        s1|
|1596048041|      2|          p4|       3|  0.3|   false|        s2|
|1596048041|      3|          p4|       3| 0.35|    true|        s3|
|1596048041|      4|          p4|       3|  0.4|   false|        s4|
+----------+-------+------------+--------+-----+--------+----------+

df_new = df_new.drop("student", "assigned")
jxc
  • 13,553
  • 4
  • 16
  • 34
  • 1
    You could pass the `maximize` option to `linear_sum_assignment` and not have explicitly use negative weights – Oluwafemi Sule Aug 05 '20 at 03:13
  • thank you @OluwafemiSule, I added a note with your suggestion. my server has SciPy version 1.2.0 which does not support this parameter, so just left the old logic as-is. – jxc Aug 05 '20 at 03:51
  • @jxc many thanks for your assistance here, this is awesome and I appreciate the thorough response as it is helping me walk through it. One quick question, and this might be my fault for not clarifying - I just clarified in the question ask, is will this solution work if there 4 professors and 4 students are not always the same? for example, for many time frames in a row it might be the same 4 professors and 4 students, but then it might be a new professor (`p5`) or a new student in the mix. there will always be only 4 professors and 4 students, but there are up to 14 profs and 14 students – Lauren Leder Aug 05 '20 at 17:36
  • @jxc the reason I realized that I don't think I clarified this/was wondering if it would still work was because I saw in step 1 as the last part we got a list of all students but that list would encompass students who were not considered in a particular time frame – Lauren Leder Aug 05 '20 at 17:42
  • @LaurenLeder, in the current method with step-1, the pivot will create a dataframe with columns of all 14 students for all professor/time, the list cols_studen will contain all 14 students, so the stack statement should be fine. The method should be working fine but with lots of NULL in the calculation and post-filtering. this might be less efficient. – jxc Aug 05 '20 at 18:14
  • One fix I can think of right now is to map student_id at each timeframe to a sequence of ids, so at each time, each professor_id will always have student_id from 1 to 4 regardless of their names, we can convert ids back to their names later by a join. will update my post next when I get time. @LaurenLeder – jxc Aug 05 '20 at 18:15
  • @jxc no problem, thanks for all of your assistance and insight on this matter. I can try this method and see if it ends up being very inefficient for now. i think of the students/professors as players on teams sitting on the bench or something. the combinations of which 4 students and which 4 professors might change by time frame. hockey is a good analogy bc a player might be in the penalty box on either team so it is possible to have 4 professors and 3 students (and then 1 prof is left without an assignment). not sure if I made this more confusing. – Lauren Leder Aug 05 '20 at 18:25
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/219292/discussion-between-jxc-and-lauren-leder). – jxc Aug 05 '20 at 20:22
  • 1
    @LaurenLeder, I adjusted the pandas_udf function to handle the issue when # of processors are less than 4. also the NULL value issues, all missing values from the 4*4 matrix feed to linear_sum_assignment will be zeroes. let me know if this works for your task. – jxc Aug 05 '20 at 21:39
0

As our friend @cronoik mention you need to use Hungarian algorithm, the best code I saw for unbalance assignment problem in python is: https://github.com/mayorx/hungarian-algorithm (also have some example in the repository :) )

you just need to convert your DataFrame into Numpy array and pass to the KM_Matcher then add a column with withColumn function in spark depend on your answer from KM_Matcher.