16

I've been working quite a lot with Apache Spark the last few months but now I have received a pretty difficult task, to compute average/minimum/maximum etcetera on a sliding window over a paired RDD where the Key component is a date tag and the value component is a matrix. So each aggregation function should also return a matrix, where for each cell the average for all of that cell in the time period is averaged.

I want to be able to say that I want the average for every 7 days, with a sliding window of one day. The sliding window movement unit is always one, and then the unit of the size of the window (so if it's every 12 weeks, the window movement unit is 1).

My initial thought now is to simply iterate, if we want an average per X days, X times, and for each time just group the elements by it's date, with an offset.

So if we have this scenario:

Days: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

Matrices: A B C D E F G H I J K L M N O

And we want the average per 5 days, I will iterate 5 times and show the grouping here:

First iteration:

Group 1: (1, A) (2, B) (3, C) (4, D) (5, E)

Group 2: (6, F) (7, G) (8, H) (9, I) (10, J)

Group 3: (11, K) (12, L) (13, M) (14, N) (15, O)

Second iteration:

Group 1: (2, B) (3, C) (4, D) (5, E) (6, F)

Group 2: (7, G) (8, H) (9, I) (10, J), (11, K)

Group 3: (12, L) (13, M) (14, N) (15, O)

Etcetera, and for each group, I have to do a fold/reduce procedure to get the average.

However as you might imagine, this is pretty slow and probably a rather bad way to do it. I can't really figure out any better way to do it though.

Johan S
  • 3,531
  • 6
  • 35
  • 63
  • I assume that on the first iteration, you simultaneously work on each of the groups? When I first started with Spark I had a bad habit of creating a new RDD that would represent Group 1 (iteration 1) and use map/reduce to find min/max/average, instead of operating on the original RDD as a whole. Essentially that sapped all the power out of Spark. I don't care to admit how long it took before I realized what I was doing. – TravisJ Dec 17 '14 at 15:42
  • 3
    This is the best attempt I've seen on this subject: http://stackoverflow.com/questions/23402303/apache-spark-moving-average – maasg Dec 17 '14 at 16:15
  • @TravisJ I iterate sequentially but then I group them by key, then reduce, all simultaneously. That's correct right? If you think looking at the code would help I can post it? – Johan S Dec 17 '14 at 19:53
  • I don't know that I would be very effective at reading scala. It sounds right. It should be correct if you are mapping (1, 2, 3, 4, 5) to key=Group 1, (6, 7, 8, 9, 10) to key=Group 2, then reduce by key. (The map and reduce applied in parallel.) The bad habit I had was to do something like newRDD = filter().reduce() where the filtering selected out the elements in group 1, and then the reduce found the min/max/avg, then filter again (in serial) for the next group. Essentially I was only using the parallelism to compute avg, find min/max. It was really bad. – TravisJ Dec 17 '14 at 20:38
  • @TravisJ Yeah I did some similar mistakes when I started out too. Thanks again for your response, I'm doing like you are stating in your latest comment, cheers! – Johan S Dec 18 '14 at 07:09

1 Answers1

10

If you convert to a DataFrame, this all gets a lot simpler -- you can just self-join the data back on itself and find the average. Say I have a series of data like this:

tsDF.show
date       amount
1970-01-01 10.0
1970-01-01 5.0
1970-01-01 7.0
1970-01-02 14.0
1970-01-02 13.9
1970-01-03 1.0
1970-01-03 5.0
1970-01-03 9.0
1970-01-04 9.0
1970-01-04 5.8
1970-01-04 2.8
1970-01-04 8.9
1970-01-05 8.1
1970-01-05 2.1
1970-01-05 2.78
1970-01-05 20.78

Which rolls up as:

tsDF.groupBy($"date").agg($"date", sum($"amount"), count($"date")).show
date       SUM(amount) COUNT(date)
1970-01-01 22.0        3
1970-01-02 27.9        2
1970-01-03 15.0        3
1970-01-04 26.5        4
1970-01-05 33.76       4

I then would need to create a UDF to shift the date for the join condition (note I am only using a 2 day window by using offset = -2):

def dateShift(myDate: java.sql.Date): java.sql.Date = {
  val offset = -2;
  val cal = Calendar.getInstance;
  cal.setTime(myDate);
  cal.add(Calendar.DATE, offset);
  new java.sql.Date(cal.getTime.getTime)
}
val udfDateShift = udf[java.sql.Date,java.sql.Date](dateShift)

And then I could easily find a 2-day rolling average like this:

val windowDF = tsDF.select($"date")
  .groupBy($"date")
  .agg($"date")
  .join(
    tsDF.select($"date" as "r_date", $"amount" as "r_amount"),
    $"r_date" > udfDateShift($"date") and $"r_date" <= $"date"
  )
  .groupBy($"date")
  .agg($"date",avg($"r_amount") as "2 day avg amount / record")

val windowDF.show
date       2 day avg amount / record
1970-01-01 7.333333333333333
1970-01-02 9.98
1970-01-03 8.58
1970-01-04 5.928571428571429
1970-01-05 7.5325

While this isn't exactly what you were trying to do, you see how you can use a DataFrame self-join to extract running averages from a data set. Hope you found this helpful.

David Griffin
  • 13,677
  • 5
  • 47
  • 65
  • That doesn't seem like it will generalize very well to a larger window size, because a window size of e.g. 7 will mean 6 joins, right? It might be better to use Sliding in MLib: https://issues.apache.org/jira/browse/SPARK-1241 – Ken Williams May 08 '15 at 18:55
  • No actually, the window is configurable in the UDF -- `offset -2` sets it to a window starting in the past by two days. You could also make the UDF take an Int that is the window, and probably even do a join so that the window is different based on on certain criteria on the fly. That being said, I have never used MLib -- never thought I was interested in Machine Learning -- so don't know Sliding, but will look at it -- thanks! – David Griffin May 08 '15 at 20:56
  • I see, I wasn't grasping how the join actually worked. Thanks. – Ken Williams May 08 '15 at 21:08
  • I'm going to clean up my answer a little, hopefully make it a little clearer what I am doing. – David Griffin May 09 '15 at 11:53