10

I have an assignment problem, and I wanted to ask the SO community the best way to go about implementing this for my spark dataframe (utilizing spark 3.1+). I will first describe the problem and then move to implementation.

Here is the problem: I have up to N tasks and up to N individuals (in the case of this problem, N=10). Each individual has a cost of performing each task, where the min cost is $0 and the max cost is $10. It is sort of a Hungarian algorithm problem with some caveats.

  1. There will be some instances where there are less than 10 tasks and/or less than 10 individuals, and it is okay for someone not to be assigned a task (or for a task to not be assigned an individual).
  2. [The more complex edge case/the one I am having trouble with] - There may be one task in the list that has the flag multiTask=True (there cannot be more than 1 multiTask, and it is possible there are none). If a worker has a cost less than x for the multiTask, he is automatically assigned to the multiTask and the multiTask is considered taken during the optimization.
    • I will share a few examples. In this example, the x value to be assigned to the multi task is 1.
      • If 1 worker out of 10 has a cost of .25 on the multiTask, he is assigned to the multiTask and then the other 9 workers will be assigned to the other 9 tasks
      • If 2 workers out of the 10 have a cost < 1 on the multiTask, both of them are assigned to the multiTask and then the other 8 workers will be assigned to 8 of the remaining 9 tasks. 1 task will not be assigned to anyone.
      • If all 10 workers have a cost < 1 on the multiTask, all of them are assigned to the multiTask. This is very rare but possible.
      • If no workers have a cost < 1 on the multiTask, the multiTask will only be assigned to one person during the optimization to minimize the cost.

Here is what the spark dataframe looks like. Note: I am showing an example where N=3 (3 tasks, 3 individuals) for simplicity sake.

from pyspark.sql import Row

rdd = spark.sparkContext.parallelize([
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=220, cost=1.50, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=110, cost=2.90, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=190, cost=0.80, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=220, cost=1.80, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=110, cost=0.90, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=190, cost=9.99, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=220, cost=1.20, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=110, cost=0.25, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=190, cost=4.99, isMultiTask=False)
])

df = spark.createDataFrame(rdd)

You will see there is a date/location as I need to solve this assignment problem for every date/location grouping. I was planning to solve this by assigning each worker and task an "index" based on their IDs using dense_rank() and then using a pandas UDF, populating the N x N numpy array based on the indices, and invoking the linear_sum_assignment function. However, I don't believe that this plan will work due to the 2nd edge case I laid out with the multiTask.

worker_order_window = Window.partitionBy("date", "locationId").orderBy("workerId")
task_order_window = Window.partitionBy("date", "locationId").orderBy("taskId")

# get the dense_rank because will use this to assign a worker ID an index for the np array for linear_sum_assignment
# dense_rank - 1 as arrays are 0 indexed
df = df.withColumn("worker_idx", dense_rank().over(worker_order_window) - 1) 
df = df.withColumn("task_idx", dense_rank().over(task_order_window) - 1)


def linear_assignment_udf(pandas_df: pd.DataFrame) -> pd.DataFrame:
  df_dict = pandas_df.to_dict('records')
  # in case there are less than N rows/columns
  N = max(pandas_df.shape[0], pandas_df.shape[1])
  arr = np.zeros((N,N))
  for row in df_dict: 
    # worker_idx will be the row number, task idx will be the col number
    worker_idx = row.get('worker_idx')
    task_idx = row.get('task_idx')
    arr[worker_idx][task_idx] = row.get('cost')
  rids, cids = linear_sum_assignment(n)
  
  return_list = []
  # now want to return a dataframe that says which task_idx a worker has 
  for r, c in zip(rids, cids):
    for d in df_dict: 
      if d.get('worker_idx') == r:
        d['task_assignment'] = c
        return_list.append(d)
  return pd.DataFrame(return_list)
      
  
  
schema = StructType.fromJson(df.schema.jsonValue()).add('task_assignment', 'integer')
df = df.groupBy("date", "locationId").applyInPandas(linear_assignment_udf, schema)

df = df.withColumn("isAssigned", when(col("task_assignment") == col("task_idx"), True).otherwise(False))

As you can see, this case does not cover the multiTask at all. I would like to solve this in the most efficient way possible so I am not tied to pandas udf or scipy.

Lauren Leder
  • 276
  • 1
  • 3
  • 15

1 Answers1

1

I don't know anything about the libraries you're using so I can't help you with the code, but I think you should do this in two steps:

  1. Assign workers to the multitask if they are required to be assigned to it. If someone is assigned to this task don't include it in your cost matrix.
  2. Assign workers to tasks using the Hungarian Algorithm (or some alternative) as normal.

The basic Hungarian Algorithm only works on square cost matrices, and it looks like you've handled that correctly by padding your cost matrix with 0's, but there are modifications of the algorithm that work with rectangular matrices. You may want to see if you have access to one of those alternatives as it could be significantly faster.

Yay295
  • 1,628
  • 3
  • 17
  • 29
  • 1
    Yeah, that's the approach I am planning on going with. However, the trouble I am having is in how best to efficiently implement it, because if I assign everyone a index beforehand, and then have to remove the people assigned to the multitask and the multitask (potentially), their rows/col will be 0 – Lauren Leder Sep 23 '21 at 14:24
  • I could not assign the indices beforehand and assign them after I see if anyone is assigned to the multitask, but that feels like a lot of looping – Lauren Leder Sep 23 '21 at 14:24
  • The Hungarian Algorithm is O(n^3) or O(n^4) depending on the implementation. An extra loop or two outside of that algorithm in order to reduce the size of n is a good tradeoff. – Yay295 Sep 23 '21 at 14:29