2

I have a column with an array of dates, I would like to know the most efficient way to extract a new column with an array of intervals (in days) between the dates.

ID DATES
X [01-01-2001, 03-01-2001, 10-01-2001, 25-01-2001]
Y ...
Z ...

Should become:

ID DATES INTERVALS
X [01-01-2001, 03-01-2001, 10-01-2001, 25-01-2001] [2, 7, 15]
Y ...
Z ...

I have UDF parsing the array and substracting the dates one by one, but I'm sure there is a most efficient way to do it. Thank you!

bAN
  • 13,375
  • 16
  • 60
  • 93

2 Answers2

2

One way is to use the transform function as shown below:

from pyspark.sql.functions import expr

df = spark.createDataFrame([
[["01-01-2001", "03-01-2001", "10-01-2001", "25-01-2001"]]
], ["dates"]) 

df.withColumn("dates", expr("transform(dates, x -> to_date(x,'dd-MM-yyyy'))"))\
  .withColumn("diff", expr("array_remove(transform(dates, (x, i) -> if(i > 0, datediff(dates[i] , dates[i-1]), -1)), -1)"))\
  .show(100, False)

# +------------------------------------------------+----------+
# |dates                                           |diff      |
# +------------------------------------------------+----------+
# |[01-01-2001, 03-01-2001, 10-01-2001, 25-01-2001]|[2, 7, 15]|
# +------------------------------------------------+----------+

With the first transformation we convert string items to date. Then we iterate through each item computing the expression if(i > 0, datediff(dates[i], dates[i-1]), -1).

Explanation

  • When index(i) > 0 get the difference between current date and the previous one using datediff which returns the difference in days.
  • Otherwise return -1. Note that we will need always to remove this. We use -1 instead of null because is easier to remove it from an array (read this for more info)
  • Eventually with array_remove delete redundant -1 from the array
abiratsis
  • 7,051
  • 3
  • 28
  • 46
1

If you have a very large data frame and/or the sizes of your date arrays vary greatly, using the Spark SQL functions to explode, calculate, and re-collect your values may be more performant than a UDF.

Setup:

import datetime

df = spark.createDataFrame([
  {
    "ID": "X", 
    "DATES": [
      datetime.date(2001, 1, 1), 
      datetime.date(2001, 1, 3), 
      datetime.date(2001, 1, 10), 
      datetime.date(2001, 1, 25)
    ]
  },
  {
    "ID": "Y", 
    "DATES": [
      datetime.date(2003, 1, 1), 
      datetime.date(2006, 3, 2), 
      datetime.date(2007, 5, 1), 
      datetime.date(2009, 8, 2)
    ]
  }
])

The most complex part of this approach is ensuring your dates and resulting remain in order throughout, which is accomplished below with the window function:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

w = Window.partitionBy("ID").orderBy("DATE")

df_intervals = df \
  .select("ID", F.explode("DATES").alias("DATE")) \
  .withColumn("DATES", F.collect_list("DATE").over(w)) \
  .withColumn("INTERVALS", F.collect_list(F.datediff("DATE", F.lag("DATE").over(w))).over(w)) \
  .groupBy("ID") \
  .agg(F.max("DATES").alias("DATES"), F.max("INTERVALS").alias("INTERVALS"))
  • Thank you. Took me some time to understand the steps but it is perfect for what I wanted to do. – bAN Sep 02 '22 at 10:43