2

My question is not a duplicate of [Joining Spark Dataframes with "isin" operator. My question is about "not in", not "is in". It is DIFFERENT!

I have two Datasets:

  • userProfileDataset: Dataset[UserProfile]
  • jobModelsDataset: Dataset[JobModel]

Case clss UserProfile is defined as

case class UserProfile(userId: Int, visitedJobIds: Array[Int])

and case class JobModel is defined as

case class JobModel(JobId: Int, Model: Map[String, Double])

I have also made two objects (UserProfileFieldNames and JobModelFieldNames) that contains the field names of these two case classes.

My objective is, for each user in userProfileDataset, find the JobModel.JobIds that are NOT contained in UserProfile.visitedJobIds. How to do this?

I've thought about using a crossJoin and then filter. It may work. Is there more direct or efficient ways?


I have tried the following approaches, but none of them worked:

val result = userProfileDataset.joinWith(jobModelsDataset,
      !userProfileDataset.col(UserProfileFieldNames.visitedJobIds).contains(jobModelsDataset.col(JobModelFieldNames.jobId)),
      "left_outer"
    )

It leads to:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'contains(_1.visitedJobIds, CAST(_2.JobId AS STRING))' due to data type mismatch: argument 1 requires string type, however, '_1.visitedJobIds' is of array type.;;

Could it be because the contains method can be only used for testing whether one string contains another string?

The following condition also didn't work:

!jobModelsDataset.col(JobModelFieldNames.jobId)
  .isin(userProfileDataset.col(UserProfileFieldNames.visitedJobIds))

It leads to:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '(_2.JobId IN (_1.visitedJobIds))' due to data type mismatch: Arguments must be same type but were: IntegerType != ArrayType(IntegerType,false);; 'Join LeftOuter, NOT _2#74.JobId IN (_1#73.visitedJobIds)

CyberPlayerOne
  • 3,078
  • 5
  • 30
  • 51
  • It sounds like the number of JobIds are quite low. If that is the case, collecting all unique JobIds and then comparing would be a possible approach. – Shaido Feb 06 '18 at 05:38
  • @Shaido Do you mean broadcast the collected JobIds and then compare with the `visitedJobIds` column? How to compare an Array[Int] with a column then? Thanks. – CyberPlayerOne Feb 06 '18 at 06:34

3 Answers3

1

If the number of unique job ids are not too many, then you can collect and broadcast those as follows

val jobIds = jobModelsDataset.map(_.JobId).distinct.collect().toSeq
val broadcastedJobIds = spark.sparkContext.broadcast(jobIds)

To compare this broadcasted sequence with the visitedJobIds column you can create an UDF

val notVisited = udf((visitedJobs: Seq[Int]) => { 
  broadcastedJobIds.value.filterNot(visitedJobs.toSet)
})

val df = userProfileDataset.withColumn("jobsToDo", notVisited($"visitedJobIds"))

Testing with jobIds = 1,2,3,4,5 and an example dataframe

+------+---------------+
|userId|  visitedJobIds|
+------+---------------+
|     1|      [1, 2, 3]|
|     2|      [3, 4, 5]|
|     3|[1, 2, 3, 4, 5]|
+------+---------------+

will give a final dataframe as

+------+---------------+--------+
|userId|  visitedJobIds|jobsToDo|
+------+---------------+--------+
|     1|      [1, 2, 3]|  [4, 5]|
|     2|      [3, 4, 5]|  [1, 2]|
|     3|[1, 2, 3, 4, 5]|      []|
+------+---------------+--------+
Shaido
  • 27,497
  • 23
  • 70
  • 73
1

You can simply explode the array column of userProfileDataset and cast it to IntegerType to join with jobModelsDataset's JobId column which is already an IntegerType. Then finally use collect_list inbuilt function to get the final result.

Exploding and casting would be as below

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val temp = userProfileDataset.withColumn("visitedJobIds", explode(col("visitedJobIds")))
    .withColumn("visitedJobIds", col("visitedJobIds").cast(IntegerType))

joining and collecting would be as below

temp.join(jobModelsDataset, temp("visitedJobIds") === jobModelsDataset("JobId"), "left")
      .groupBy("userId")
      .agg(collect_list("visitedJobIds").as("visitedJobIds"), collect_list("JobId").as("ModelJobIds"))
    .show(false)

You should get what you are looking for

Updated

If you are looking for JobIds that each userId is not associated with then you can do as below.

val list = jobModelsDataset.select(collect_list("JobId")).rdd.first()(0).asInstanceOf[collection.mutable.WrappedArray[Int]]
def notContained = udf((array: collection.mutable.WrappedArray[Int]) => list.filter(x => !(array.contains(x))))
temp.join(jobModelsDataset, temp("visitedJobIds") === jobModelsDataset("JobId"), "left")
      .groupBy("userId")
      .agg(collect_list("visitedJobIds").as("visitedJobIds"), collect_list("JobId").as("ModelJobIds"))
      .withColumn("ModelJobIds", notContained(col("ModelJobIds")))
    .show(false)

You can improve the answer by broadcasting.

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • I want to find the jobs in `jobModelsDataset` that are NOT visited by each user according to the `visitedJobIds` column of `userProfileDataset`. So I guess I should change `===` in your codes to `=!=`. However that probably can not achieve the original objective. – CyberPlayerOne Feb 06 '18 at 08:23
  • @Tyler提督九门步军巡捕五营统领, I have updated my answer :) please test it :) – Ramesh Maharjan Feb 06 '18 at 09:12
  • What is the difference it makes here to broadcast `list` and not broadcast it? It seems in your codes `list` is collected by the Driver? – CyberPlayerOne Feb 07 '18 at 05:58
  • yes list should be collected to get one list. broadcasting will help to use memory efficiently as only the requested data from broadcasted variable will be distributed to executors and not all of them. – Ramesh Maharjan Feb 07 '18 at 06:10
0

Originally I had another approach which uses crossJoin and then filter:

val result = userProfileDataset
  .crossJoin(jobModelsDataset) // 27353040 rows
  .filter(row => !row(2).asInstanceOf[Seq[Int]].contains(row.getInt(3))) //27352633 rows

If I use @Shaido's approach then explode, I should be able to obtain the same result of this approach. However this approach is very expensive even with a filter in my case (I have compared the time elapsed). The explain method can also print out the Physical Plan.

So I will not use the crossJoin approach. I just want to post and keep it here.

CyberPlayerOne
  • 3,078
  • 5
  • 30
  • 51