3

So I have an RDD with irregular time series data:

1, <value1>
4, <value4>
6, <value6>
..etc.

and I need to fill it into regular time series:

1, <value1>
2, <value1>
3, <value1>
4, <value4>
5, <value4>
6, <value6>
..etc.

So far I have created an RDD with 1,2,3,4,5,6,.. then leftOuterJoin'ed it to original RDD, which gave me:

1, <value1>
2, <None>
3, <None>
4, <value4>
5, <None>
6, <value6>
..etc.

So the problem I am facing is filling those 2,3,5 with values from previous non-Null row.

I would prefer to do it on RDD level without going to sparkSQL, which is of course a last resort option. Going to scala Array level isn't very inviting since for performance issues I would prefer to keep it on RDD level.

Thanks

aleck
  • 305
  • 4
  • 10
  • 1
    _going to sparkSQL (...) is of course a last resort option_ - could you explain why? – zero323 Oct 23 '15 at 04:10
  • at that point the only two solutions I would have were: a) window function via sparkSQL; b) pure scala outside of spark – aleck Oct 23 '15 at 09:45
  • 1
    window function based on my previous Oracle experience was generally the slowest one, so I guess the reason of not going that way is purely psychological :) – aleck Oct 23 '15 at 09:51
  • Actually it is pretty good intuition it this case. Spark window functions perform relatively well only if you can provide `PARTITION BY` clause. Here it is not impossible, but far from straightforward and cannot be done with a single job. – zero323 Oct 25 '15 at 08:46

1 Answers1

2

Relatively simple solution without initial join. Lets start with dummy data and a helper function:

val rdd = sc.parallelize(Seq(
    (3L, 1.0), (1L, 4.0), (5L, 3.6), (7L, 0.2), (8L, 0.0)))

def fillTimePoints(xs: Array[(Long, Double)]) = xs match {
  case Array((xTime, xValue), (yTime, _)) => {
    val diff = yTime - xTime

    if (diff == 0) Seq((xTime, xValue))
    else (xTime, xValue) +: (1 until diff.toInt)
      .map(_.toLong)
      .map(i => (i + xTime, xValue))
  }

  case _ => Seq.empty[(Long, Double)]
}

All what is left now is sliding over sorted RDD:

import org.apache.spark.mllib.rdd.RDDFunctions._

rdd.sortBy(_._1).sliding(2).flatMap(fillTimePoints).collect

//  Array[(Long, Double)] = Array((1,4.0), (2,4.0), (3,1.0), 
//    (4,1.0), (5,3.6), (6,3.6), (7,0.2))

Notes:

  • sliding is a part of the developer API. Most of the methods from its class have been deprecated in the recent releases. It is still possible to code it from scratch but for now it should work,

  • you may prefer using RangePartitioner followed by repartitionAndSortWithinPartitions instead of sorting. Then you can apply local sliding using mapPartitions with preservePartitioning set to true and finally fill the gaps (once again preservePartitioning). It requires more work but you get output partitioned with RangePartitioner what can be useful.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Interesting, did not know about the `sliding()` method. What happens when there is a gap between records on the edges of partitions (e.g. last record of partition 1 and first record of partition 2 have a gap between them)? – Rohan Aletty Oct 23 '15 at 05:58
  • @RohanAletty Since sliding covers switching between partitions it will be covered. You can do something similar manually in two steps with either broadcast (see my answer here http://stackoverflow.com/a/31686744/1560062, http://stackoverflow.com/a/33072089/1560062) or partitioning (see here Daniels solution here http://stackoverflow.com/a/23436517/1560062). – zero323 Oct 23 '15 at 06:36
  • Superb solution, thanks a bunch. Wasn't aware of sliding window function, extremely valuable to time series of course! – aleck Oct 23 '15 at 10:15