1

In Spark's old RDD-based API, it was possible to "cogroup" up to 3 RDDs with your original RDD in a single cogroup call, assuming they were all pair RDDs with the same key.

In the new Dataset API, it seems that I have to call groupByKey twice and cogroup once for each dataset I want to group with.

For example, imagine I have a dataset with recent activity data and two 'metadata' datasets that I use to provide context for the activity, and I want to cogroup them (the history and profile can be large structures, and if I join them with the activity the joined dataset will be unreasonably large). Here's what I do today:

// These data types all have an "id" field for correlation
val activityData: Dataset[Activity] = getActivity()
val locationHistory: Dataset[LocationHistory] = getLocationHistory()
val profiles: Dataset[Profile] = getProfiles()

// This first cogroup aligns activity with location history
val partialGroup = activityData.groupByKey(_.id)
    .cogroup(locationHistory.groupByKey(_.id)) {
        case (id, activity, location) if activity.nonEmpty =>
            (id, activity, location)

        case _ => None
    }

// This second cogroup adds the profile to complete the grouping
val fullGroup = partialGroup.groupByKey(_._1)
    .cogroup(profiles.groupByKey(_.id)) {
        case (id, activityAndLocation, profile) =>
            activityAndLocation.map { case(_, activity, location) =>
                (id, activity, location, profile)
            }
    }

It seems a bit long for what would be essentially a one-liner in the RDD API. Is there another way to accomplish this in the Dataset API that doesn't require as much repetition?

Matt
  • 1,284
  • 14
  • 22
  • Related question regarding `cogroup` on dataframes: https://stackoverflow.com/questions/36513574/cogroup-on-spark-dataframes – Shaido Sep 07 '18 at 02:10
  • A workaround that would require less code would be to convert all three dataframes into rdds with `.rdd`, do the `cogroup` and the use `toDF()` to get the final dataframes. Otherwise maybe you can look into if the more commonly used `join` can be used to solve the same problem. – Shaido Sep 07 '18 at 02:12
  • @Shaido converting to RDDs and back couldn't really be considered an improvement, even if it did save a few lines of code. – Matt Sep 07 '18 at 04:44

0 Answers0