42

I have a huge file in HDFS having Time Series data points (Yahoo Stock prices).

I want to find the moving average of the Time Series how do I go about writing the Apache Spark job to do that .

Ahmed Shabib
  • 687
  • 1
  • 8
  • 16
  • 2
    Moving average is a tricky problem for Spark, and any distributed system. When the data is spread across multiple machines, there will be some time windows that cross partitions. I think the key is duplicating data points at the start and end of partitions. I will try to think of a way to do this in Spark. – Daniel Darabos May 01 '14 at 12:54
  • Why can't this be done by traversing the RDD? That returns the partitions in order. – Adrian Jan 12 '15 at 21:24
  • This is the same answer as @Arwind but written in Java: http://stackoverflow.com/questions/31965615/moving-average-in-spark-java/35117608#35117608 – Victor Jan 31 '16 at 18:29
  • @Victor Well that was a year later !! – Ahmed Shabib Feb 23 '16 at 20:43
  • Better later than never, I guess – Victor Feb 24 '16 at 08:29

3 Answers3

31

You can use the sliding function from MLLIB which probably does the same thing as Daniel's answer. You will have to sort the data by time before using the sliding function.

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()
Arvind
  • 326
  • 3
  • 2
  • Awesome! It's not exactly like my answer. It fetches the first (window-1) elements from each partition and uses this small amount of data to fill in the gaps. ([code](https://github.com/apache/spark/blob/v1.4.1/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala)) – Daniel Darabos Jul 30 '15 at 14:26
  • It Map Reduce, we need to have a Custom InputFormat which reads a few additional lined from the next split to read the complete window, just like, TextInputFormat reads some additonal from the next Split. – user2458922 Nov 28 '18 at 20:01
  • MapMethod could keep on maintaining a list of values to the size of widow. That is, till the size is not reached, keep on accumulating into the list. Once the size is reached, compute Average and do context.write(). In the Next Map() method call, add the new value to the list, delete the oldest value from the list and compute Average and do context.write(). SPARK, does not give the control of accumulating values till with in a Task, and managing its count etc – user2458922 Nov 28 '18 at 20:01
  • .sliding(3).map(curSlice => (curSlice.sum / curSlice.size)) Seems Simple. Whats the data type of curSlice would be. If the values are not Numbers But Text and we need to find most frequent words in a window, can we have curSlice support all data types. ? @Arvind ? – user2458922 Nov 29 '18 at 15:12
23

Moving average is a tricky problem for Spark, and any distributed system. When the data is spread across multiple machines, there will be some time windows that cross partitions. We have to duplicate the data at the start of the partitions, so that calculating the moving average per partition gives complete coverage.

Here is a way to do this in Spark. The example data:

val ts = sc.parallelize(0 to 100, 10)
val window = 3

A simple partitioner that puts each row in the partition we specify by the key:

class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
  def numPartitions = p
  def getPartition(key: Any) = key.asInstanceOf[Int]
}

Create the data with the first window - 1 rows copied to the previous partition:

val partitioned = ts.mapPartitionsWithIndex((i, p) => {
  val overlap = p.take(window - 1).toArray
  val spill = overlap.iterator.map((i - 1, _))
  val keep = (overlap.iterator ++ p).map((i, _))
  if (i == 0) keep else keep ++ spill
}).partitionBy(new StraightPartitioner(ts.partitions.length)).values

Just calculate the moving average on each partition:

val movingAverage = partitioned.mapPartitions(p => {
  val sorted = p.toSeq.sorted
  val olds = sorted.iterator
  val news = sorted.iterator
  var sum = news.take(window - 1).sum
  (olds zip news).map({ case (o, n) => {
    sum += n
    val v = sum
    sum -= o
    v
  }})
})

Because of the duplicate segments this will have no gaps in coverage.

scala> movingAverage.collect.sameElements(3 to 297 by 3)
res0: Boolean = true
Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • The sorting in the last step may be unnecessary. It seems the data arrives sorted anyway. I don't know if there are guarantees for the repartitioning to behave this way. – Daniel Darabos May 02 '14 at 20:37
  • 2
    Why can't this be done by traversing the RDD? That returns the partitions in order... then you just need to duplicate the parts at the ends of the RDD. I wonder if updateStateByKey would help make things easier. – Adrian Jan 12 '15 at 21:26
  • 2
    It is an interesting approach but you're making a risky assumption that there are no empty / to short partitions. For example: `val m = Map(1 -> (0 to 50).toIterator, 4 -> (51 to 100).toIterator).withDefault(i => Iterator()); val ts = sc.parallelize(Seq.empty[Int], 10).mapPartitionsWithIndex((i, _) => m(i))` – zero323 Oct 23 '15 at 20:11
  • I use something similar [here](http://stackoverflow.com/a/31686744/1560062) and [here](http://stackoverflow.com/a/33072089/1560062) with broadcast variables instead of partitioner and assign data based on counts. – zero323 Oct 25 '15 at 10:45
20

Spark 1.4 introduced windowing functions, which means that you can do moving average as follows adjust windowing with rowsBetween:

val schema = Seq("id", "cykle", "value")
 val data = Seq(
        (1, 1, 1),
        (1, 2, 11),
        (1, 3, 1),
        (1, 4, 11),
        (1, 5, 1),
        (1, 6, 11),
        (2, 1, 1),
        (2, 2, 11),
        (2, 3, 1),
        (2, 4, 11),
        (2, 5, 1),
        (2, 6, 11)
      )

val dft = sc.parallelize(data).toDF(schema: _*)

dft.select('*).show

// PARTITION BY id  ORDER BY cykle ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING (5)
val w = Window.partitionBy("id").orderBy("cykle").rowsBetween(-2, 2)

val x = dft.select($"id",$"cykle",avg($"value").over(w))
x.show

Output (in zeppelin):

schema: Seq[String] = List(id, cykle, value)
data: Seq[(Int, Int, Int)] = List((1,1,1), (1,2,11), (1,3,1), (1,4,11), (1,5,1), (1,6,11), (2,1,1), (2,2,11), (2,3,1), (2,4,11), (2,5,1), (2,6,11))
dft: org.apache.spark.sql.DataFrame = [id: int, cykle: int, value: int]
+---+-----+-----+
| id|cykle|value|
+---+-----+-----+
|  1|    1|    1|
|  1|    2|   11|
|  1|    3|    1|
|  1|    4|   11|
|  1|    5|    1|
|  1|    6|   11|
|  2|    1|    1|
|  2|    2|   11|
|  2|    3|    1|
|  2|    4|   11|
|  2|    5|    1|
|  2|    6|   11|
+---+-----+-----+
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@55cd666f
x: org.apache.spark.sql.DataFrame = [id: int, cykle: int, 'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING: double]
+---+-----+-------------------------------------------------------------------------+
| id|cykle|'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING|
+---+-----+-------------------------------------------------------------------------+
|  1|    1|                                                        4.333333333333333|
|  1|    2|                                                                      6.0|
|  1|    3|                                                                      5.0|
|  1|    4|                                                                      7.0|
|  1|    5|                                                                      6.0|
|  1|    6|                                                        7.666666666666667|
|  2|    1|                                                        4.333333333333333|
|  2|    2|                                                                      6.0|
|  2|    3|                                                                      5.0|
|  2|    4|                                                                      7.0|
|  2|    5|                                                                      6.0|
|  2|    6|                                                        7.666666666666667|
+---+-----+————————————————————————————————————+
oluies
  • 17,694
  • 14
  • 74
  • 117
  • 1
    Also check out this blog article: http://xinhstechblog.blogspot.de/2016/04/spark-window-functions-for-dataframes.html It is a more practical explanation of how window functions work than the official announcement. – Michael Koenig Jul 14 '16 at 09:37
  • What happens if you don't have anything to partition by, that is, if you need to perform the moving average on all of the data? This is my case, as I have timeseries data and nothing to partition by. In this case all of the data would be moved to one node, which is a problem, right? How to overcome this issue? – Kobe-Wan Kenobi Apr 05 '17 at 09:57
  • @Marko what is the data? Have a look at the aproximate quarties and spark-ts https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html https://github.com/sryza/spark-timeseries – oluies Apr 06 '17 at 11:29
  • Thanks for the answer even a year later :) Data represents multivariate time-series. That is, each column is a parameter measured during time. Not sure how approximate can help me with the moving average and I would avoid this library as it is third party and not developed any longer. Any other idea, perhaps? Does the problem that I'm afraid of really exist? Would I get all the data on one node if I have nothing to partition on? – Kobe-Wan Kenobi Apr 06 '17 at 13:38
  • I think the default partitioner is used http://stackoverflow.com/questions/34491219/default-partitioning-scheme-in-spark – oluies Apr 07 '17 at 14:15
  • what do you say @zero323 ? – oluies Apr 07 '17 at 14:16