-1

I have an ETL script using Pandas, and to make it more scalable I am trying to recreate it with Pyspark. Got everything going so far, but having issues with a particular transformation to a daily dataset. I have one record per ID with start date and end date

id  age state   start_date  end_date
123 18  CA     2/17/2019    5/4/2019
223 24  AZ     1/17/2019    3/4/2019

I want to create a record for each day between the start and end day, so I can join daily activity data to it. The target output would look something like this

id  age state   start_date
123 18  CA      2/17/2019
123 18  CA      2/18/2019
123 18  CA      2/19/2019
123 18  CA      2/20/2019
123 18  CA      2/21/2019
            …
123 18  CA      5/2/2019
123 18  CA      5/3/2019
123 18  CA      5/4/2019

And of course do this for all ids and their respective start dates in the dataset. I was able to do this in Pandas using the following approach

melt = df.melt(id_vars=['id', 'age', 'state'], value_name='date').drop('variable', axis=1)
melt['date'] = pd.to_datetime(melt['date'])

melt = melt.groupby('id').apply(lambda x: x.set_index('date').resample('d').first())\
           .ffill()\
           .reset_index(level=1)\
           .reset_index(drop=True)

But I am fairly new to Pyspark (and was struggling with this in Pandas) so I'm stuck here. Any help is much appreciated - thanks!

L Xandor
  • 1,659
  • 4
  • 24
  • 48
  • Does this answer your question? [PySpark: how to resample frequencies](https://stackoverflow.com/questions/39271374/pyspark-how-to-resample-frequencies) – user10938362 Mar 04 '20 at 17:00
  • Definitely looks like a good place to start. Thanks! – L Xandor Mar 04 '20 at 17:03
  • So that's getting closer, but I'm not clear on how to do this across multiple ids? Basically, in the example he creates a time series with the required intervals, and then joining the observations from a single variable back to it. But in my case each ID should have the range of daily intervals, and these intervals will be different for each ID. – L Xandor Mar 04 '20 at 17:24

1 Answers1

0

Found the solution in this post. The key for my solution was the explode function, which does what I need.

The code to solve my specific example is

def date_range(t1, t2, step=60*60*24):
    return [t1 + step*x for x in range(int((t2-t1)/step)+1)]

date_range_udf = udf(date_range, ArrayType(LongType()))

df = dataF.select("id",
expr("stack(2, 'start_date', start_date, 'end_date', end_date) as (class_date,date)"))

df_base = \
    df.groupBy('id')\
        .agg(min('date').cast('integer').alias('date_min'), max('date').cast('integer')\
    .alias('date_max'))\
        .withColumn("date", explode(date_range_udf("date_min", "date_max")))\
        .drop('date_min', 'date_max')\
        .withColumn("date", from_unixtime("date"))

Which gives the following output (which I can use to join any additional data)

enter image description here

L Xandor
  • 1,659
  • 4
  • 24
  • 48