I have a function that operates on a list, but it causes performance problems and I would like to make it completely dataset-based. Apparently, the dataset has the same methods as the list, so it should be possible to painlessly translate this method, but somehow I can't get to the compiling solution.
def findDuplicates(products: Dataset[AggregateWithSupplierProducts]): Dataset[DuplicatesByIds] = {
val productsDatasetAsList = products.as[AggregateWithSupplierProducts].collect().toList
val duplicates = productsDatasetAsList.groupBy(_.products.sortBy(_.product.productId)).filter(_._2.size > 1).values.toList
mapToDataset(duplicates)
}
From the very beginning, returned from the groupBy RelationalGroupedDataset prevents me from conveniently translating the rest of the part.
Data structure:
case class AggregateWithSupplierProducts(
id: String,
products: List[Product])
case class Product(productId: String, productCount: Int)
case class DuplicatesByIds(duplicates: List[String)
Example of data:
[ {
"id": "ID1",
"products": [
{
"product": {
"productId": "SOME_ID",
"productCount": 1
}
},
{
"product": {
"productId": "SOME_OTHER_ID",
"productCount": 1
}
}
],
},
{
"id": "ID2",
"products": [
{
"product": {
"productId": "SOME_ID",
"productCount": 1
}
},
{
"product": {
"productId": "SOME_OTHER_ID",
"productCount": 1
}
}
],
},
{
"id": "ID3",
"products": [
{
"product": {
"productId": "DIFFERENT_ID",
"productCount": 1
}
},
{
"product": {
"productId": "SOME_OTHER_ID",
"productCount": 1
}
}
],
},
{
"id": "ID4",
"products": [
{
"product": {
"productId": "SOME_OTHER_ID",
"productCount": 1
}
},
{
"product": {
"productId": "DIFFERENT_ID",
"productCount": 1
}
}
],
},
{
"id": "ID5",
"products": [
{
"product": {
"productId": "NOT_DUPLICATED_ID",
"productCount": 1
}
},
{
"product":
"productId": "DIFFERENT_ID",
"productCount": 2
}
}
],
}
]
Result of this would be:
Dataset with
DuplicatesByIds(List("ID1", "ID2")),
DuplicatesByIds(List("ID3", "ID4"))
Code is working proper with dataset collected to list, but I have got considerable problem with translating it to working full on dataset, without wasting memory