4

I have a large table like

I want to change it to a new table : id, date, last_state .

It is very easy by pandas :

df['time_create'] = pd.to_datetime(df['time_create'])
df = df.set_index('time_create')
df = df.sort_index()
df = df.groupby('id').resample('D').last().reset_index()

But it is very hard to implement by pyspark.

I knew:

  1. Resample equivalent in pysaprk is groupby + window :

    grouped = df.groupBy('store_product_id', window("time_create", "1 day")).agg(sum("Production").alias('Sum Production'))
    

    here groupby store_product_id , resample in day and calculate sum

  2. Group by and find first or last:

    refer to https://stackoverflow.com/a/35226857/1637673

    w = Window().partitionBy("store_product_id").orderBy(col("time_create").desc())
    (df
      .withColumn("rn", row_number().over(w))
      .where(col("rn") == 1)
      .select("store_product_id", "time_create", "state"))
    

    This groupby id and get the last row order by time_create .

However what I need is groupby id, resample by day,then get last row order by time_create .

I know this problem may could be solved if I use pandas udf , Applying UDFs on GroupedData in PySpark (with functioning python example)

But is there any way to do this just by pyspark ?

Mithril
  • 12,947
  • 18
  • 102
  • 153
  • Cant you just include the day in your window? Something like `Window().partitionBy("store_product_id", dayofmonth(col("time_create"))).orderBy(col("time_create").desc())` – gaw Apr 12 '19 at 09:28
  • @gaw Sounds right . I forgot `partitionBy` can take multiple columns . In that case, shouldn't use `dayofmonth`, need add a new date column from `time_create` . But isn't the partitions too many ? My dataset across 2016 to 2019 with over 200 million rows . – Mithril Apr 12 '19 at 09:52
  • I think this should still be possible. In this 4 years you have like ~1400 days and different products but spark should be able to handle this I think. Just make sure to use the day with month and year then. I applied a window on a big proxy dataset and partitioned this by client-ip and destination url and it still worked. I think I had much more partitions for this usecase :) – gaw Apr 12 '19 at 13:15

1 Answers1

1

Just partitionBy("store_product_id", "date") do the trick

w = Window().partitionBy("store_product_id", "date").orderBy(col("time_create").desc())
x = (df
    .withColumn("rn", row_number().over(w))
    .where(col("rn") == 1)
    .select("store_product_id", "time_create", "state"))
Mithril
  • 12,947
  • 18
  • 102
  • 153