1

I am working with a pyspark data frame with users, dates and locations. My goal is to implement a sliding window of 3 days [-1 days, 1 days] and calculate the most common location inside the window.

+---+-------------+----+------------+---------+
| ID|         date| loc| GOAL_Window| GOAL_Loc|
+---+-------------+----+------------+---------+
|ID1|   2017-07-01|  L1|     [L1,L1]|       L1|
|ID1|   2017-07-02|  L1|  [L1,L1,L5]|       L1|
|ID1|   2017-07-03|  L5|  [L1,L5,L1]|       L1|
|ID1|   2017-07-04|  L1|  [L5,L1,L5]|       L5|
|ID1|   2017-07-05|  L5|  [L1,L5,L5]|       L5|
|ID1|   2017-07-06|  L5|     [L5,L5]|       L5|
|ID1|   2017-07-08|  L5|        [L5]|       L5|
|ID2|   2017-07-01|  L0|     [L0,L0]|       L0|
|ID2|   2017-07-02|  L0|        [L0]|       L0|
+---+-------------+----+------------+---------+

For each row, I need to:

  1. Select the sliding window. Applied with:
days = lambda i: i*86400
w = Window.partitionBy('id').orderBy(F.col('date').cast('timestamp').cast('long')\
     .rangeBetween(-days(-1),days(1))           
  1. Create a UDF that is applied to the window and calculates:
    2.a. For each element of the window, count the number of times the location appears
    2.b. Order the series of counts by descending order
    2.c. Select the location ID with the highest counts
    I applied it taking inspiration from: Improve Pandas UDF in Pyspark
import pandas as pd
from typing import List
from pyspark.sql.types import StringType

@F.udf(StringType)
def pd_ctfirst(dt: List[str]) -> str:
   df = pd.DataFrame({'loc':loc})
   df = df.reset_index().groupby('loc').count()
   return str(df.reset_index().sort_values('index', ascending = False)['loc'].values[0])

df_ = df_.withColumn('GOAL_Window', F.collect_list(F.col('loc')).over(w))\         
         .withColumn('GOAL_Loc', pd_ctfirst(F.collect_list(F.col('loc')).over(w)))

When inspecting the data frame (e.g: df_.take()) the result looks correct, but I am unable to save it or order it. When doing so, I get error:

IndexError: index 0 is out of bounds for axis 0 with size 0

Any suggestions on why I get the error, and how to apply the calculation to each window slide? Thanks in advance!

s223
  • 59
  • 6

1 Answers1

0

In case someone finds a similar situation, I managed to overcome this problem by adding a check on the df size:

@F.udf(StringType)
def pd_ctfirst(dt: List[str]) -> str:
   df = pd.DataFrame({'loc':loc})
   if df.size != 0:
     df = df.reset_index().groupby('loc').count()
     return str(df.reset_index().sort_values('index', ascending = False)['loc'].values[0])
   else:
     return None 
s223
  • 59
  • 6