0

I have a Spark dataframe (articleDF1) below, I am trying to add two columns Start and End date using the Date column to the Dataframe and grouping the resulting dataframe by post_evar10. The final Dataframe will have post_evar10, Start Date and End date

 -------+--------------------+
|      Date|         post_evar10|
+----------+--------------------+
|2019-09-02|www:/espanol/recu...|
|2019-09-02|www:/caregiving/h...|
|2019-12-15|www:/health/condi...|
|2019-09-01|www:/caregiving/h...|
|2019-08-31|www:/travel/trave...|
|2020-01-20|www:/home-family/...|

What I have tried:

from pyspark.sql import functions as f
articleDF3 = articleDF1.withColumn('Start_Date', f.min(f.col('Date'))).withColumn('Start_Date', f.max(f.col('Date'))).groupBy(f.col("post_evar10")).drop("Date")

Getting Error: org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and 'temp.ms_article_lifespan_final.Date' is not an aggregate function. Wrap '(min(temp.ms_article_lifespan_final.Date) AS Start_Date)' in windowing function(s) or wrap 'temp.ms_article_lifespan_final.Date' in first() (or first_value) if you don't care which value you get.;;

NewCode
  • 109
  • 1
  • 8

1 Answers1

1

Is this what your expected result?

To get min,max for each row we can use window function and get min,max then group by and in aggregation get the min,max values!

Example:

import sys
from pyspark.sql.window import Window
from pyspark.sql.functions import *

#Sample data
df=sc.parallelize([('2019-09-02','www:/espanol/r'),('2019-09-02','www:/caregiving/h'),('2019-12-15','www:/health/condi')]).toDF(['Date','post_evar10']).withColumn("Date",col("Date").cast("Date"))

#window on all rows
w = Window.orderBy("Date").rowsBetween(-sys.maxsize, sys.maxsize)
#or
w = Window.orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn("min_Date",min("Date").over(w)).\ #get min value for Date
withColumn("max_Date",max("Date").over(w)).\ #get max value for Date
groupBy("post_evar10").\ #groupby on post_evar10
agg(min("min_Date").alias("Start_date"),max("max_Date").alias("End_date")).\ #get min,max
show()

#+-----------------+----------+----------+
#|      post_evar10|Start_date|  End_date|
#+-----------------+----------+----------+
#|   www:/espanol/r|2019-09-02|2019-12-15|
#|www:/caregiving/h|2019-09-02|2019-12-15|
#|www:/health/condi|2019-09-02|2019-12-15|
#+-----------------+----------+----------+

(or)

By using first,last functions over window:

df.withColumn("min_Date",first("Date").over(w)).\
withColumn("max_Date",last("Date").over(w)).\
groupBy("post_evar10").\
agg(min("min_Date").alias("Start_date"),max("max_Date").alias("End_date")).\
show()

Generate min,max for each post_evar10 unique value:

w = Window.partitionBy('post_evar10').orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df=sc.parallelize([('2019-09-02','www:/espanol/r'),('2019-09-02','www:/caregiving/h'),('2019-09-03','www:/caregiving/h'),('2019-12-15','www:/health/condi')]).toDF(['Date','post_evar10']).withColumn("Date",col("Date").cast("Date"))

df.groupBy("post_evar10").\
agg(min("Date").alias("Start_date"),max("Date").alias("End_date")).\
show()

#+-----------------+----------+----------+
#|      post_evar10|Start_date|  End_date|
#+-----------------+----------+----------+
#|www:/health/condi|2019-12-15|2019-12-15|
#|   www:/espanol/r|2019-09-02|2019-09-02|
#|www:/caregiving/h|2019-09-02|2019-09-03|
#+-----------------+----------+----------+
notNull
  • 30,258
  • 4
  • 35
  • 50
  • I tried something similar using Windows function see code below, but did not use the sys function. What is the advantage of using the sys function here. Yes, the resulting DF will have the start and end date of each post_evar10. – NewCode Feb 24 '20 at 16:37
  • from pyspark.sql.window import Window from pyspark.sql import functions as f articleDF1.window = Window.partitionBy('post_evar10').orderBy('Date') articleDF3 = articleDF1.withColumn('Start_Date', f.min(f.col('Date')).over(articleDF1.window)).withColumn('End_Date', f.max(f.col('Date')).over(articleDF1.window)).drop("Date") articleDF3.show() – NewCode Feb 24 '20 at 16:40
  • 1
    we can use either `sys` (or) `Window.unboundedPreceding` to create row frames on logical offsets. in our case we are having full records as our row frame and caluculating min,max values! https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html https://stackoverflow.com/questions/48138632/in-python-what-is-sys-maxsize – notNull Feb 24 '20 at 18:51