0

I have a dataframe and after adding a rank column i can split it to several dataframes based on th number of ranks :

rankedDF :

job_id task_id rating proba rank
1 111 1 0.7 1
1 111 2 0.3 1
1 122 4 0.9 2
1 122 7 0.1 2
1 133 3 0.6 3
1 133 1 0.4 3

To create multiple dataframes :

val numberRanks = rankedDF.select("rank").distinct().count().toInt

// create multiple dataframe
val rankDFs = for (i <- 1 to numberRanks) yield {
rankedDF.filter(col("rank") === i)
}

Then I join between the dataframes and create arrays to combine rating between them, task_id between them and multiply proba between thems :

// join dataframes between them 
val joinedDFs = rankDFs.reduce((df1, df2) => 
df1.join(df2, Seq("job_id"))
.withColumn("combination_ratings", array(col("rating"), col("rating"))
.withColumn("combination_task", array(col("task_id"), col("task_id"))
.withColumn("final_proba", col("proba") * col("proba"))
).select("job_id", "combination_task", "combination_ratings", "final_proba")

The intermediate result just after the join and before creation of the combination arrays is :

job_id task_id rating proba task_id rating proba task_id rating proba
1 111 1 0.1 122 3 0.7 133 3 0.6
1 111 2 0.3 122 4 0.4 133 1 0.2

After combination the result should be somthing like this :

job_id combination_task combination_ratings final_proba
1 [111, 122, 133] [1, 4, 3] 0.378
1 [111, 122, 133] [2, 7, 1] 0.012

But I get error :

reference 'rating' is ambiguous, could be: rating, rating

Ps : I also tried to aliasing dataframes in the join expression but the error was same

Ib D
  • 391
  • 1
  • 5
  • 17
  • Does this answer your question? [spark join causing column id ambiguity error](https://stackoverflow.com/questions/55511560/spark-join-causing-column-id-ambiguity-error) – Gaël J May 14 '23 at 14:28
  • Ps : I also tried to aliasing dataframes in the join expression but the error was same – Ib D May 14 '23 at 14:34
  • @GaëlJ, no this not answer my question, il also have same error : `Reference 'a.rating' is ambiguous, could be: a.rating, a.rating` – Ib D May 14 '23 at 14:38
  • @GaëlJ, here the difference is in the number of dataframe to join, in my example is 3 dataframes, it could be 4 or more, the idea is to join dataframes 2 by 2 to get the final dataframe and the arrays – Ib D May 14 '23 at 14:42

1 Answers1

0

Getting a step back and looking into the final results you need, it seems you just need to group by job_id and use collect_set/collect_list instead of splitting into multiple dataframes and trying to join, something like this:

import org.apache.spark.sql.functions._

val combinedRanksByJob = rankedDF.groupBy("job_id").agg(
    collect_set("task_id").as("combination_task"),
    collect_set("rating").as("combination_ratings"),
    aggregate("proba", lit(1.0), (acc, nxt) => acc * nxt).as("final_proba")
)

combinedRanksByJob .show(false)
Islam Elbanna
  • 1,438
  • 2
  • 9
  • 15