11

I am trying to extract features based on sliding window over time series data. In Scala, it seems like there is a sliding function based on this post and the documentation

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()

My questions is there similar functions in PySpark? Or how do we achieve similar sliding window transformations if there is no such function yet?

Community
  • 1
  • 1
Bin
  • 3,645
  • 10
  • 33
  • 57

3 Answers3

14

As far as I can tell sliding function is not available from Python and SlidingRDD is a private class and cannot be accessed outside MLlib.

If you to use sliding on an existing RDD you can create poor man's sliding like this:

def sliding(rdd, n):
    assert n > 0
    def gen_window(xi, n):
        x, i = xi
        return [(i - offset, (i, x)) for offset in xrange(n)]

    return (
        rdd.
        zipWithIndex(). # Add index
        flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset
        groupByKey(). # Group to create windows
        # Sort values to ensure order inside window and drop indices
        mapValues(lambda vals: [x for (i, x) in sorted(vals)]).
        sortByKey(). # Sort to makes sure we keep original order
        values(). # Get values
        filter(lambda x: len(x) == n)) # Drop beginning and end

Alternatively you can try something like this (with a small help of toolz)

from toolz.itertoolz import sliding_window, concat

def sliding2(rdd, n):
    assert n > 1

    def get_last_el(i, iter):
        """Return last n - 1 elements from the partition"""
        return  [(i, [x for x in iter][(-n + 1):])]

    def slide(i, iter):
        """Prepend previous items and return sliding window"""
        return sliding_window(n, concat([last_items.value[i - 1], iter]))

    def clean_last_items(last_items):
        """Adjust for empty or to small partitions"""
        clean = {-1: [None] * (n - 1)}
        for i in range(rdd.getNumPartitions()):
            clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):]
        return {k: tuple(v) for k, v in clean.items()}

    last_items = sc.broadcast(clean_last_items(
        rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))

    return rdd.mapPartitionsWithIndex(slide)
zero323
  • 322,348
  • 103
  • 959
  • 935
  • equivalent scala sliding function can it group by key , I want key value in my output , sc.textFile(file).keyBy(x => x.split("\\~") (0)) .map(x => x._2.split("\\~")).map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x => (x.sum/x.size)) – sri hari kali charan Tummala Jul 31 '16 at 06:31
  • 1
    ``i, x = xi`` should be changed to ``x, i = xi`` – MyounghoonKim Sep 27 '16 at 07:55
  • 1
    sorted(vals) would sort the values and change their original order in the window because the index would be the same due to groupBy. If the original order must be preserved, the gen_window() can generate tuple with the original index as [(i - offset, i, x) for offset in xrange(n)] and extract the values with mapValues(lambda vals: [x for (pos, i, x) in sorted(vals)]). – Farley Mar 04 '17 at 08:00
  • Hand-made sliding windows are unfortunately very slow compared with that provided by Scala RDDFunctions ;-( – Farley Mar 09 '17 at 23:16
  • @Farley You can use Scala functions if you really need but it requires a bit of trickery. If you need this feel free to ping me and I'll post the details. – zero323 Mar 10 '17 at 01:25
  • @Farley Maybe not a fast but reasonably fast with pickler. You can actually pickle all records, convert RDD to `RDD[Array[Byte]]`, slide with Scala slider, go back, unpickle the results – zero323 Mar 10 '17 at 11:36
  • @zero323 Thx, I somehow get the idea that it's possible to capture the sequences near the partition boundaries and compute the sliding windows over the 'leftovers.' In this way, the performance seems comparable. – Farley Mar 12 '17 at 21:32
  • @Farley Yeah, but you need an additional actions, same as the code above. And since we use iterators we just go to the end without touching each element. It is all about trade-offs. The one tricky part is that sometimes window can span over multiple partitions. – zero323 Mar 13 '17 at 14:27
  • @zero323 Nice catch! Repartitioning could be one workaround rather than concatenating across partitions. – Farley Mar 13 '17 at 19:26
5

To add to venuktan's answer, here is how to create a time-based sliding window using Spark SQL and retain the full contents of the window, rather than taking an aggregate of it. This was needed in my use case of preprocessing time series data into sliding windows for input into Spark ML.

One limitation of this approach is that we assume you want to take sliding windows over time.

Firstly, you may create your Spark DataFrame, for example by reading in a CSV file:

df = spark.read.csv('foo.csv')

We assume that your CSV file has two columns: one of which is a unix timestamp and the other which is a column you want to extract sliding windows from.

from pyspark.sql import functions as f

window_duration = '1000 millisecond'
slide_duration = '500 millisecond'

df.withColumn("_c0", f.from_unixtime(f.col("_c0"))) \
    .groupBy(f.window("_c0", window_duration, slide_duration)) \
    .agg(f.collect_list(f.array('_c1'))) \
    .withColumnRenamed('collect_list(array(_c1))', 'sliding_window')

Bonus: to convert this array column to the DenseVector format required for Spark ML, see the UDF approach here.

Extra Bonus: to un-nest the resulting column, such that each element of your sliding window has its own column, try this approach here.

I hope this helps, please let me know if I can clarify anything.

Shane Halloran
  • 318
  • 4
  • 9
  • 1
    Hi @shane-halloran , just tried your example locally. I ran into this error: `AttributeError: 'GroupedData' object has no attribute 'withColumn'`. I'm running spark 2.2.0 on a Mac; Are you sure the posted code above is correct? – jay Nov 16 '17 at 16:46
  • @jay Thank-you for your question, I have edited my answer to fix the code. The main thing that was wrong was that `withColumn` and `groupBy` were in the wrong order. I am also using the same Spark 2.2.0 on Mac. Let me know if you need further clarification. :) – Shane Halloran Nov 16 '17 at 21:43
0

spark 1.4 has window functions, as described here : https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Hope that helps, please let me know.

venuktan
  • 1,649
  • 2
  • 14
  • 29
  • 2
    As much I appreciate window functions there not particularly useful in this scenario. To answer this question you need something like this `AVG(foo) OVER (ORDER BY bar ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)`. Problem is Spark would move everything to a single partition to do it. – zero323 Oct 03 '15 at 20:14