1

In my Spark Scala application I have an RDD with the following format:

(05/05/2020, (name, 1))
(05/05/2020, (name, 1))
(05/05/2020, (name2, 1))
...
(06/05/2020, (name, 1))

What I want to do is group these elements by date and sum the tuples that have the same "name" as key.

Expected Output:

(05/05/2020, List[(name, 2), (name2, 1)]),
(06/05/2020, List[(name, 1)])
...

In order to do that, I am currently using a groupByKey operation and some extra transformations in order to group the tuples by key and calculate the sum for those that share the same one.

For performance reasons, I would like to replace this groupByKey operation with a reduceByKey or an aggregateByKey in order to reduce the amount of data transferred over the network.

However, I can't get my head around on how to do this. Both of these transformations take as parameter a function between the values (tuples in my case) so I can't see how I can group the tuples by key in order to calculate their sum.

Is it doable?

matrix
  • 161
  • 2
  • 12

3 Answers3

0

Yes .aggeregateBykey() can be used as follows:

import scala.collection.mutable.HashMap

def merge(map: HashMap[String, Int], element: (String, Int)) = {
 if(map.contains(element._1)) map(element._1) += element._2 else map(element._1) = element._2
 map
}

val input = sc.parallelize(List(("05/05/2020",("name",1)),("05/05/2020", ("name", 1)),("05/05/2020", ("name2", 1)),("06/05/2020", ("name", 1))))

val output = input.aggregateByKey(HashMap[String, Int]())({
  //combining map & tuple   
  case (map, element) => merge(map, element) 
}, {
  // combining two maps 
  case (map1, map2) => {
   val combined = (map1.keySet ++ map2.keySet).map { i=> (i,map1.getOrElse(i,0) + map2.getOrElse(i,0)) }.toMap
   collection.mutable.HashMap(combined.toSeq: _*)
  } 
}).mapValues(_.toList)

credits: Best way to merge two maps and sum the values of same key?

vdep
  • 3,541
  • 4
  • 28
  • 54
0

Here's how you can merge your Tuples using reduceByKey:

/**
File /path/to/file1:
15/04/2010  name
15/04/2010  name
15/04/2010  name2
15/04/2010  name2
15/04/2010  name3
16/04/2010  name
16/04/2010  name

File /path/to/file2:
15/04/2010  name
15/04/2010  name3
**/

import org.apache.spark.rdd.RDD

val filePaths = Array("/path/to/file1", "/path/to/file2").mkString(",")

val rdd: RDD[(String, (String, Int))] = sc.textFile(filePaths).
  map{ line =>
    val pair = line.split("\\t", -1)
    (pair(0), (pair(1), 1))
  }

rdd.
  map{ case (k, (n, v)) => (k, Map(n -> v)) }.
  reduceByKey{ (acc, m) =>
    acc ++ m.map{ case (n, v) => (n -> (acc.getOrElse(n, 0) + v)) }
  }.
  map(x => (x._1, x._2.toList)).
  collect
// res1: Array[(String, List[(String, Int)])] = Array(
//   (15/04/2010, List((name,3), (name2,2), (name3,2))), (16/04/2010, List((name,2)))
// )

Note that the initial mapping is needed because we want to merge the Tuples as elements in a Map, and reduceByKey for RDD[K, V] requires the same data type V before and after the transformation:

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
Leo C
  • 22,006
  • 3
  • 26
  • 39
  • Your solution, although minimal doesn't seem to work. In that particular simple example which you provided it works. However, i read my input data from different files and for a strange reason these computations are only applied on the data taken from the first file. – matrix Oct 20 '18 at 10:49
  • I further tested this and it seems that when I am reading 2 different files (through sc.textfile) the number of partitions is 2. It seems that your approach operates only on the first partition. When I repartition my rdd to 1, it works, however this is not optimal. – matrix Oct 20 '18 at 11:01
  • @matrix, I'm not able to replicate the said problem. Also tested the code with a larger RDD explicitly loaded into multiple partitions and it aggregated across the partitions as expected. As you can see, the proposed solution simply uses common RDD methods `map` and `reduceByKey` in their simplest form. Could you share a minimal version of the exact code (and sample data if possible) that led to the erroneous result? – Leo C Oct 20 '18 at 16:43
  • I created a small app that reproduces the issue. (In the comments you'll see the data for each file) You can find it here: https://pastebin.com/E7s2zcw4 – matrix Oct 20 '18 at 18:41
  • @matrix, thanks for the sample code and data that is able to demonstrate the said problem. The anomaly appears to be caused by the function in `reduceByKey` not being processed properly across partitions in some cases. The problem is so subtle that it wouldn't necessarily surface simply by having multiple partitions; and when it does surface it's also not that only the first partition is processed. I'm at this point inclined to consider it a bug. Rewriting the function to using `Map ++ Map` operation, rather than `Map + element`, seems to fix the problem. Please see my revised answer. – Leo C Oct 20 '18 at 23:59
0

You could convert the RDD to a DataFrame and just use a groupBy with sum, here is one way to do it

import org.apache.spark.sql.types._
val schema = StructType(StructField("date", StringType, false) :: StructField("name", StringType, false) ::  StructField("value", IntegerType, false) :: Nil)

val rd = sc.parallelize(Seq(("05/05/2020", ("name", 1)),
("05/05/2020", ("name", 1)),
("05/05/2020", ("name2", 1)),
("06/05/2020", ("name", 1))))

val df = spark.createDataFrame(rd.map{ case (a, (b,c)) => Row(a,b,c)},schema)
df.show

+----------+-----+-----+
|      date| name|value|
+----------+-----+-----+
|05/05/2020| name|    1|
|05/05/2020| name|    1|
|05/05/2020|name2|    1|
|06/05/2020| name|    1|
+----------+-----+-----+

val sumdf = df.groupBy("date","name").sum("value")
sumdf.show

+----------+-----+----------+
|      date| name|sum(value)|
+----------+-----+----------+
|06/05/2020| name|         1|
|05/05/2020| name|         2|
|05/05/2020|name2|         1|
+----------+-----+----------+
sramalingam24
  • 1,297
  • 1
  • 14
  • 19