1

Given a pyspark dataframe df with columns 'ProductId', 'Date' and 'Price', how safe is to sort by 'Date' and assume that func.first('Price') will always retrieve the Price corresponding to the minimum date?

I mean: will
df.orderBy('ProductId', 'Date').groupBy('ProductId').agg(func.first('Price'))
return for each product the first price paid in time without messing with the orderBy while grouping?

foebu
  • 1,365
  • 2
  • 18
  • 35
  • I also found [this scala post](https://stackoverflow.com/questions/39505599/spark-dataframe-does-groupby-after-orderby-maintain-that-order) where the accepted answer says that order is preserved, but the discussion in the comments and other answers say otherwise. – pault Feb 23 '18 at 15:44

1 Answers1

1

I am not sure if the order is guaranteed to be maintained for the groupBy(). However, here is alternative way to do what you want that will work.

Use pyspark.sql.Window to partition and order the DataFrame as desired. Then use pyspark.sql.DataFrame.distinct() to drop the duplicate entries.

For example:

Create Dummy Data

data = [
    (123, '2017-07-01', 50),
    (123, '2017-01-01', 100),
    (345, '2018-01-01', 20),
    (123, '2017-03-01', 25),
    (345, '2018-02-01', 33)
]

df = sqlCtx.createDataFrame(data, ['ProductId', 'Date', 'Price'])
df.show()
#+---------+----------+-----+
#|ProductId|      Date|Price|
#+---------+----------+-----+
#|      123|2017-07-01|   50|
#|      123|2017-01-01|  100|
#|      345|2018-01-01|   20|
#|      123|2017-03-01|   25|
#|      345|2018-02-01|   33|
#+---------+----------+-----+

Use Window

Use Window.partitionBy('ProductId').orderBy('Date'):

import pyspark.sql.functions as f
from pyspark.sql import Window

df.select(
    'ProductId',
    f.first('Price').over(Window.partitionBy('ProductId').orderBy('Date')).alias('Price')
).distinct().show()
#+---------+-----+
#|ProductId|Price|
#+---------+-----+
#|      123|  100|
#|      345|   20|
#+---------+-----+

Edit

I found this scala post in which the accepted answer says that the order is preserved, though there is a discussion in the comments that contradicts that.

pault
  • 41,343
  • 15
  • 107
  • 149
  • Well, I need to group by because i need to aggregate on other columns. My real case is a bit more complicated. – foebu Feb 23 '18 at 15:40
  • You can extend this to multiple aggregations. Just add more items to the `select` statement. You can also define the window outside to save typing: `w = Window.partitionBy('ProductId').orderBy('Date')` and then simplify to `f.first('Price').over(w).alias(...)` – pault Feb 23 '18 at 15:43
  • Isn't it possible to use the window function inside the aggregator of a groupby? Like `df.groupBy('Product).agg(f.first('Price').over(w).alias('First price'))`, where `w=Window.partitionBy('Product').orderby('Date')`. – foebu Feb 23 '18 at 15:45
  • I couldn't get that syntax to work (doesn't mean it's not possible). – pault Feb 23 '18 at 15:49