In Spark's old RDD-based API, it was possible to "cogroup" up to 3 RDDs with your original RDD in a single cogroup
call, assuming they were all pair RDDs with the same key.
In the new Dataset API, it seems that I have to call groupByKey
twice and cogroup
once for each dataset I want to group with.
For example, imagine I have a dataset with recent activity data and two 'metadata' datasets that I use to provide context for the activity, and I want to cogroup them (the history and profile can be large structures, and if I join them with the activity the joined dataset will be unreasonably large). Here's what I do today:
// These data types all have an "id" field for correlation
val activityData: Dataset[Activity] = getActivity()
val locationHistory: Dataset[LocationHistory] = getLocationHistory()
val profiles: Dataset[Profile] = getProfiles()
// This first cogroup aligns activity with location history
val partialGroup = activityData.groupByKey(_.id)
.cogroup(locationHistory.groupByKey(_.id)) {
case (id, activity, location) if activity.nonEmpty =>
(id, activity, location)
case _ => None
}
// This second cogroup adds the profile to complete the grouping
val fullGroup = partialGroup.groupByKey(_._1)
.cogroup(profiles.groupByKey(_.id)) {
case (id, activityAndLocation, profile) =>
activityAndLocation.map { case(_, activity, location) =>
(id, activity, location, profile)
}
}
It seems a bit long for what would be essentially a one-liner in the RDD API. Is there another way to accomplish this in the Dataset API that doesn't require as much repetition?