0

i'm working with a dataframe with the columns basketID and itemID. is there a way to efficiently parse through the dataset and generate a map where the keys are basketID and the value is a set of all the itemID contained within each basket?

my current implementation uses a for loop over the data frame which isn't very scalable. is it possible to do this more efficiently? any help would be appreciated thanks!

screen shot of sample data

the goal is to obtain basket = Map("b1" -> Set("i1", "i2", "i3"), "b2" -> Set("i2", "i4"), "b3" -> Set("i3", "i5"), "b4" -> Set("i6")). heres the implementation I have using a for loop

// create empty container
val basket = scala.collection.mutable.Map[String, Set[String]]()
// loop over all numerical indexes for baskets (b<i>)
for (i <- 1 to 4) {
  basket("b" + i.toString) = Set();
}
// loop over every row in df and store the items to the set
df.collect().foreach(row => 
  basket(row(0).toString) += row(1).toString
)
tyjchen
  • 5
  • 2

2 Answers2

1

You can simply do aggregateByKey operation then collectItAsMap will directly give you the desired result. It is much more efficient than simple groupBy.

import scala.collection.mutable
case class Items(basketID: String,itemID: String)
 
 import spark.implicits._
 val result = output.as[Items].rdd.map(x => (x.basketID,x.itemID))
.aggregateByKey[mutable.Buffer[String]](new mutable.ArrayBuffer[String]())
 ((l: mutable.Buffer[String], p: String) => l += p , 
 (l1: mutable.Buffer[String], l2: mutable.Buffer[String]) => (l1 ++ l2).distinct)
.collectAsMap();

you can check other aggregation api's like reduceBy and groupBy over here. please also check aggregateByKey vs groupByKey vs ReduceByKey differences.

kavetiraviteja
  • 2,058
  • 1
  • 15
  • 35
  • thanks! this was able to work on a small test dataset, but once I run the .collectAsMap() command I get a memory error "java.lang.OutOfMemoryError: GC overhead limit exceeded". any ideas how I can get around it? my understanding is that there isn't much I can do since the collect function forces everything to go to the driver memory – tyjchen Sep 01 '20 at 14:47
  • check this thread --> https://stackoverflow.com/questions/63574983/how-to-write-spark-dataframe-in-a-single-file-in-local-system-without-using-coal/63577161#63577161 .... you can use ***toLocalIterator*** instead ***collectAsMap*** to tackle memory issues ... then you need to fill global map by iterating on result of toLocalIterator.... please also check whether any partition size is more than spark.driver.maxResultSize if is the case then increase ***spark.driver.maxResultSize** size as well. – kavetiraviteja Sep 01 '20 at 15:03
  • I increased the driver memory and the original solution ran very fast, thank you for your help! – tyjchen Sep 01 '20 at 15:34
0

This is efficient assuming your dataset is small enough to fit into the driver's memory. .collect will give you an array of rows on which you are iterating which is fine. If you want scalability then instead of Map[String, Set[String]] (this will reside in driver memory) you can use PairRDD[String, Set[String]] (this will be distributed).

//NOT TESTED

//Assuming df is dataframe with 2 columns, first is your basketId and second is itemId

df.rdd.map(row => (row.getAs[String](0), row.getAs[String](1)).groupByKey().mapValues(x => x.toSet)

Samir Vyas
  • 442
  • 2
  • 6