1

If I have an RDD with volumes per minute, e.g.

(("12:00" -> 124), ("12:01" -> 543), ("12:02" -> 102), ... )

And I want to go about mapping that to a dataset with volume in this minute, volume of previous minute, average volume of previous 5 minutes. E.g.

(("12:00" -> (124, 300, 245.3)),
("12:01" -> (543, 124, 230.2)),
("12:02" -> (102, 543, 287.1)))

The input RDD could be a RDD[(DateTime, Int)] and the output RDD[(DateTime, (Int, Int, Float))].

What are good ways to go about doing that?

zero323
  • 322,348
  • 103
  • 959
  • 935
Patrick McGloin
  • 2,204
  • 1
  • 14
  • 26
  • Is your data complete or is it possible that there are some missing records? – zero323 Sep 11 '15 at 15:17
  • There could be gaps, which I would default to zero. I don't mind if the solution takes care of this or not. – Patrick McGloin Sep 11 '15 at 15:27
  • In pure scala, I'd convert to DateTime and use SortedMap. How big is your dataset? – Reactormonk Sep 11 '15 at 15:30
  • 1
    I guess this approach could help, il creates windows over a time series : http://stackoverflow.com/questions/23402303/apache-spark-moving-average . I figure creating windows is close to what you are trying to do, that is, group together a value with it's previous / next in the series – GPI Sep 11 '15 at 15:42
  • Comments appreciated, useful stuff. – Patrick McGloin Sep 11 '15 at 17:10

1 Answers1

4

Converting to a data frame and using window functions can cover lag, average and possible gaps:

import com.github.nscala_time.time.Imports._
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{lag, avg, when}
import org.apache.spark.sql.expressions.Window

val fmt = DateTimeFormat.forPattern("HH:mm:ss")

val rdd = sc.parallelize(Seq(
  ("12:00:00" -> 124), ("12:01:00" -> 543), ("12:02:00" -> 102),
  ("12:30:00" -> 100), ("12:31:00" -> 101)
).map{case (ds, vol) => (fmt.parseDateTime(ds), vol)})

val df = rdd
  // Convert to millis for window range
  .map{case (dt, vol) => (dt.getMillis, vol)} 
  .toDF("ts", "volume")

val w = Window.orderBy($"ts")

val transformed = df.select(
  $"ts", $"volume",
  when(
    // Check if we have data from the previous minute
    (lag($"ts", 1).over(w) - $"ts").equalTo(-60000), 
    // If so get lag otherwise 0
    lag($"volume", 1).over(w)).otherwise(0).alias("previous_volume"),
  // Average over window 
  avg($"volume").over(w.rangeBetween(-300000, 0)).alias("average"))

// Optionally go to back to RDD
transformed.map{
  case Row(ts: Long, volume: Int, previousVolume: Int, average: Double) =>
    (new DateTime(ts) -> (volume, previousVolume, average))
}

Just be aware that window functions without window partitioning are quite inefficient.

zero323
  • 322,348
  • 103
  • 959
  • 935