56

I have a Spark SQL DataFrame with date column, and what I'm trying to get is all the rows preceding current row in a given date range. So for example I want to have all the rows from 7 days back preceding given row. I figured out, I need to use a Window Function like:

Window \
    .partitionBy('id') \
    .orderBy('start')

I want to have a rangeBetween 7 days, but there is nothing in the Spark docs I could find on this. Does Spark even provide such option? For now I'm just getting all the preceding rows with:

.rowsBetween(-sys.maxsize, 0)

but would like to achieve something like:

.rangeBetween("7 days", 0)
ZygD
  • 22,092
  • 39
  • 79
  • 102
Nhor
  • 3,860
  • 6
  • 28
  • 41

3 Answers3

100

Spark >= 2.3

Since Spark 2.3 it is possible to use interval objects using SQL API, but the DataFrame API support is still work in progress.

df.createOrReplaceTempView("df")

spark.sql(
    """SELECT *, mean(some_value) OVER (
        PARTITION BY id 
        ORDER BY CAST(start AS timestamp) 
        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
     ) AS mean FROM df""").show()

## +---+----------+----------+------------------+       
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

Spark < 2.3

As far as I know it is not possible directly neither in Spark nor Hive. Both require ORDER BY clause used with RANGE to be numeric. The closest thing I found is conversion to timestamp and operating on seconds. Assuming start column contains date type:

from pyspark.sql import Row

row = Row("id", "start", "some_value")
df = sc.parallelize([
    row(1, "2015-01-01", 20.0),
    row(1, "2015-01-06", 10.0),
    row(1, "2015-01-07", 25.0),
    row(1, "2015-01-12", 30.0),
    row(2, "2015-01-01", 5.0),
    row(2, "2015-01-03", 30.0),
    row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))

A small helper and window definition:

from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col


# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400 

Finally query:

w = (Window()
   .partitionBy(col("id"))
   .orderBy(col("start").cast("timestamp").cast("long"))
   .rangeBetween(-days(7), 0))

df.select(col("*"), mean("some_value").over(w).alias("mean")).show()

## +---+----------+----------+------------------+
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

Far from pretty but works.


* Hive Language Manual, Types

zero323
  • 322,348
  • 103
  • 959
  • 935
  • 2
    I use Spark 2.3, but the first option doesn't work for me and throws exception `scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$)` There is a JIRA issue that will be fixed in 2.4.0: https://issues.apache.org/jira/browse/SPARK-25845 – Raman Yelianevich Dec 20 '18 at 13:58
  • 1
    Hi, for the last query, can I ask you how to include the 'days'? I got "name 'days' is not defined". – Spacez Nov 28 '19 at 03:59
  • 1
    @Spacez the "days" helper function is declared above as a lambda function that multiplies the argument by 86400 (one day in seconds). – Matt Apr 27 '20 at 17:30
  • `Window.partitionBy(col("id"), pyspark.sql.functions.window("start", "1 day"))` – rjurney Sep 30 '20 at 21:37
  • @zero323, would you like to explain whey in window function, you add cast('timestamp').cast('long'), is cast('long') a must? thank you. – user288609 Apr 01 '21 at 02:23
8

Spark 3.3 is released, but...

The answer may be as old as Spark 1.5.0: datediff.

datediff(col_name, '1000') will return an integer difference of days from 1000-01-01 to col_name.

As the first argument, it accepts dates, timestamps and even strings.
As the second, it even accepts 1000.


The answer

Date difference in days - depending on the data type of the order column:

date

  • Spark 3.1+

    .orderBy(F.expr("unix_date(col_name)")).rangeBetween(-7, 0)
    
  • Spark 2.1+

    .orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
    

timestamp

  • Spark 2.1+

    .orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
    

long - UNIX time in microseconds (e.g. 1672534861000000)

  • Spark 2.1+

    .orderBy(F.col("col_name") / 86400_000000).rangeBetween(-7, 0)
    

long - UNIX time in milliseconds (e.g. 1672534861000)

  • Spark 2.1+

    .orderBy(F.col("col_name") / 86400_000).rangeBetween(-7, 0)
    

long - UNIX time in seconds (e.g. 1672534861)

  • Spark 2.1+

    .orderBy(F.col("col_name") / 86400).rangeBetween(-7, 0)
    

long in format yyyyMMdd

  • Spark 3.3+

    .orderBy(F.expr("unix_date(to_date(col_name, 'yyyyMMdd'))")).rangeBetween(-7, 0)
    
  • Spark 3.1+

    .orderBy(F.expr("unix_date(to_date(cast(col_name as string), 'yyyyMMdd'))")).rangeBetween(-7, 0)
    
  • Spark 2.2+

    .orderBy(F.expr("datediff(to_date(cast(col_name as string), 'yyyyMMdd'), '1000')")).rangeBetween(-7, 0)
    
  • Spark 2.1+

    .orderBy(F.unix_timestamp(F.col("col_name").cast('string'), 'yyyyMMdd') / 86400).rangeBetween(-7, 0)
    

string in date format of 'yyyy-MM-dd'

  • Spark 3.1+

    .orderBy(F.expr("unix_date(to_date(col_name))")).rangeBetween(-7, 0)
    
  • Spark 2.1+

    .orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
    

string in other date format (e.g. 'MM-dd-yyyy')

  • Spark 3.1+

    .orderBy(F.expr("unix_date(to_date(col_name, 'MM-dd-yyyy'))")).rangeBetween(-7, 0)
    
  • Spark 2.2+

    .orderBy(F.expr("datediff(to_date(col_name, 'MM-dd-yyyy'), '1000')")).rangeBetween(-7, 0)
    
  • Spark 2.1+

    .orderBy(F.unix_timestamp("col_name", 'MM-dd-yyyy') / 86400).rangeBetween(-7, 0)
    

string in timestamp format of 'yyyy-MM-dd HH:mm:ss'

  • Spark 2.1+

    .orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
    

string in other timestamp format (e.g. 'MM-dd-yyyy HH:mm:ss')

  • Spark 2.2+

    .orderBy(F.expr("datediff(to_date(col_name, 'MM-dd-yyyy HH:mm:ss'), '1000')")).rangeBetween(-7, 0)
    
ZygD
  • 22,092
  • 39
  • 79
  • 102
  • How can we make this dynamic, i need to give the range starting as first day of the current month and end on current day of the month ? – NNM Nov 01 '22 at 16:38
5

Fantastic solution @zero323, if you want to operate with minutes instead of days as I have to, and you don't need to partition with id, so you only have to modify a simply part of the code as I show:

df.createOrReplaceTempView("df")
spark.sql(
    """SELECT *, sum(total) OVER (
        ORDER BY CAST(reading_date AS timestamp) 
        RANGE BETWEEN INTERVAL 45 minutes PRECEDING AND CURRENT ROW
     ) AS sum_total FROM df""").show()
pabloverd
  • 614
  • 8
  • 8