1

I have a text file which I read and then split using the split operation. This results in an RDD with Array(A, B, C, D, E, F, G, H, I).

I would like to find max(F) - min(G) for every key E (reduce by key E). Then I want to combine the resulting values by key C and concatenate this sum result for every row with the same key.

For example:

+--+--+--+--+
| C| E| F| G|
+--+--+--+--+
|en| 1| 3| 1|
|en| 1| 4| 0|
|nl| 2| 1| 1|
|nl| 2| 5| 2|
|nl| 3| 9| 3|
|nl| 3| 6| 4|
|en| 4| 9| 1|
|en| 4| 2| 1|
+-----------+

Should result in

+--+--+-------------+---+
| C| E|max(F)-min(G)|sum|
+--+--+-------------+---+
|en| 1| 4           |12 |
|nl| 2| 4           |10 |
|nl| 3| 6           |10 |
|en| 4| 8           |12 |
+--+--+-------------+---+

What would be the best way to tackle this? Currently I am trying to perform the max(F)-min(G) by running

val maxCounts = logEntries.map(line => (line(4), line(5).toLong)).reduceByKey((x, y) => math.max(x, y))
val minCounts = logEntries.map(line => (line(4), line(6).toLong)).reduceByKey((x, y) => math.min(x, y))

val maxMinCounts = maxCounts.join(minCounts).map{ case(id, maxmin) => (id, (maxmin._1 - maxmin._2)) }

And then join the resulting RDDs. However, this becomes tricky when I also want to sum these values and append them to my existing data set.

I would love to hear any suggestions!

Laurens
  • 63
  • 1
  • 13
  • 2
    It is easy when you work with Spark SQL DataFrame, you can convert your RDD to DataFrame and do all the aggregate operations.. try this link http://stackoverflow.com/questions/33882894/sparksql-apply-aggregate-functions-to-a-list-of-column – Shankar Oct 04 '16 at 16:47
  • Why not combine `math.max` and `math.min` into the same RDD? – OneCricketeer Oct 04 '16 at 16:47

2 Answers2

2

This kind of logic is easily implemented in the dataframe API (also). But you need to explicitly form your columns from the array:

val window = Window.partitionBy('C)

val df = rdd
  .map { case Array(_, _, c, _, e, f, g, _, _) => (c,e,f,g) }
  .toDF("C","E","F","G")
  .groupBy('C,'E)
  .agg((max('F) - min('G)).as("diff"))
  .withColumn("sum",sum('diff).over(window))   
Wilmerton
  • 1,448
  • 1
  • 12
  • 31
  • Thank you for your suggestion. I changed `sum(diff)` to `sum('diff)` and `Window.partitionBy('v)` to `Window.partitionBy('C)` because it would result in an error otherwise. However, when I try running this code I get the following error: `scala.MatchError: [Ljava.lang.String;@f908897 (of class [Ljava.lang.String;)`. Using `Window.partitionBy('v)` resulted in `org.apache.spark.sql.AnalysisException: cannot resolve '\`v\`' given input columns: [C, E, diff];`. – Laurens Oct 05 '16 at 10:38
  • I solved this problem by using the Spark CSV reader instead of reading the input file directly. The problem probably had to do with special characters in the input. – Laurens Oct 05 '16 at 16:17
1

assuming, like your sample data, that unique E's never span multiple C's... you could do something like this.

import math.{max,min}

case class FG(f: Int, g: Int) {
  def combine(that: FG) =
    FG(max(f, that.f), min(g, that.g))
  def result = f - g 
}

val result = {
  rdd
  .map{ case Array(_, _, c, _, e, f, g, _, _) => 
    ((c, e), FG(f, g)) }
  .reduceByKey(_ combine _)
  .map{ case ((c, _), fg) =>
    (c, fg.result) }
  .reduceByKey(_+_)  
}
kmh
  • 1,516
  • 17
  • 33