0

I have a function that operates on a list, but it causes performance problems and I would like to make it completely dataset-based. Apparently, the dataset has the same methods as the list, so it should be possible to painlessly translate this method, but somehow I can't get to the compiling solution.

def findDuplicates(products: Dataset[AggregateWithSupplierProducts]): Dataset[DuplicatesByIds] = {
    val productsDatasetAsList = products.as[AggregateWithSupplierProducts].collect().toList
    val duplicates = productsDatasetAsList.groupBy(_.products.sortBy(_.product.productId)).filter(_._2.size > 1).values.toList
    mapToDataset(duplicates)
  } 

From the very beginning, returned from the groupBy RelationalGroupedDataset prevents me from conveniently translating the rest of the part.
Data structure:

case class AggregateWithSupplierProducts(
                                         id: String, 
                                         products: List[Product])
case class Product(productId: String, productCount: Int)
case class DuplicatesByIds(duplicates: List[String)

Example of data:

[  {
   "id": "ID1",
   "products": [
     {
       "product": {
         "productId": "SOME_ID",
         "productCount": 1
       }
     },
     {
       "product": {
         "productId": "SOME_OTHER_ID",
         "productCount": 1
       }
     }
   ],
 },
 {
   "id": "ID2",
   "products": [
     {
       "product": {
         "productId": "SOME_ID",
         "productCount": 1
       }
     },
     {
       "product": {
         "productId": "SOME_OTHER_ID",
         "productCount": 1
       }
     }
   ],
 },
 {
   "id": "ID3",
   "products": [
     {
       "product": {
         "productId": "DIFFERENT_ID",
         "productCount": 1
       }
     },
     {
       "product": {
         "productId": "SOME_OTHER_ID",
         "productCount": 1
       }
     }
   ],
 },
 {
   "id": "ID4",
   "products": [
     {
       "product": {
         "productId": "SOME_OTHER_ID",
         "productCount": 1
       }
     },
     {
       "product": {
         "productId": "DIFFERENT_ID",
         "productCount": 1
       }
     }
   ],
 },
 {
   "id": "ID5",
   "products": [
     {
       "product": {
         "productId": "NOT_DUPLICATED_ID",
         "productCount": 1
       }
     },
     {
       "product":
         "productId": "DIFFERENT_ID",
         "productCount": 2
       }
     }
   ],
 }
]

Result of this would be:

Dataset with 
DuplicatesByIds(List("ID1", "ID2")),
DuplicatesByIds(List("ID3", "ID4")) 

Code is working proper with dataset collected to list, but I have got considerable problem with translating it to working full on dataset, without wasting memory

xard4sTR
  • 25
  • 6
  • Indeed `collect`ing data to transform them later loose all the benefits of Spark. – Gaël J Aug 09 '22 at 20:06
  • Look at https://stackoverflow.com/questions/49747006/remove-all-records-which-are-duplicate-in-spark-dataframe, it's the opposite of what you're looking for but should help. – Gaël J Aug 09 '22 at 20:09

1 Answers1

1

You can group and filter duplicates as follows. Note that I had to add another case class P, otherwise I couldn't parse your json sample:

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._

case class Product(productId: String, productCount: BigInt)
case class P(product: Product)
case class AggregateWithSupplierProducts(
                                         id: String, 
                                         products: List[P])
case class DuplicatesByIds(duplicates: List[String])

def findDuplicates(products: Dataset[AggregateWithSupplierProducts]) = 
    (
        products
        .withColumn("productList", sort_array(col("products.product.productId")))
        .groupBy("productList")
        .agg(collect_list("id").alias("duplicates"))
        .filter(size(col("duplicates"))>1)
        .select("duplicates")
        .as[DuplicatesByIds]
    )

val ds = spark.read.option("multiline", "true").json("test.json").as[AggregateWithSupplierProducts]
ds.show(false)
+---+-----------------------------------------------+
|id |products                                       |
+---+-----------------------------------------------+
|ID1|[{{1, SOME_ID}}, {{1, SOME_OTHER_ID}}]         |
|ID2|[{{1, SOME_ID}}, {{1, SOME_OTHER_ID}}]         |
|ID3|[{{1, DIFFERENT_ID}}, {{1, SOME_OTHER_ID}}]    |
|ID4|[{{1, SOME_OTHER_ID}}, {{1, DIFFERENT_ID}}]    |
|ID5|[{{1, NOT_DUPLICATED_ID}}, {{2, DIFFERENT_ID}}]|
+---+-----------------------------------------------+

findDuplicates(ds).show
+----------+
|duplicates|
+----------+
|[ID3, ID4]|
|[ID1, ID2]|
+----------+
qaziqarta
  • 1,782
  • 1
  • 4
  • 11
  • Hi, thank You for answering - I just edited question with information about data – xard4sTR Aug 10 '22 at 12:12
  • The result of findDuplicates is dataset with DuplicatesByIds(duplicates: List[String) - where duplicates: List[String] is list of AggregateWithSupplierProducts IDs where products: List[Product] is identical – xard4sTR Aug 10 '22 at 12:25
  • I also provided data examples and output. – xard4sTR Aug 10 '22 at 12:48
  • I adapted your solution: `def findDuplicates(products: Dataset[AggregateWithSupplierProducts]): Dataset[DuplicatesByIds] = { products.groupByKey(_.products.sortBy(_.product.productId)) .mapGroups((key, values) => DuplicatesByIds(values.map(prod => prod.id).toList)).filter(_.duplicates.size > 1)` Its working in most cases - but not in comparing and linking aggregateWithSupplierProducts with unsorted products. – xard4sTR Aug 10 '22 at 15:10
  • I pulled the sort separately before groupBy and everything works as it should. Thank you! – xard4sTR Aug 10 '22 at 15:59
  • Glad it helped! I edited the answer according to new details. – qaziqarta Aug 10 '22 at 22:00