0

I have a pyspark dataframe with name h2_df and with columns "parsed_date" (dtype: date) and "id" (dtype: bigint) as shown below:

+-------+-----------+
|     id|parsed_date|
+-------+-----------+
|1471783| 2017-12-18|
|1471885| 2017-12-18|
|1472928| 2017-12-19|
|1476917| 2017-12-21|
|1477469| 2017-12-22|
|1478190| 2017-12-22|
|1478570| 2017-12-22|
|1481415| 2017-12-25|
|1472592| 2017-12-19|
|1474023| 2017-12-20|
+-------+-----------+

I want to create a function where I pass a date and inside the function I want to count the id (from the dataframe h2_df created outside the function) for each date that lies between date ranges. range 1 is (day, day+t) and range 2 is (day+t, day+(2*t) and t =5.

I am new to pyspark so the code below is ofcourse vague and does not work:

def hypo_2(day):
    t = td(days=5)
    start_date_before = day 
    end_date_before = day+t
    
    start_date_after = day+t
    end_date_after = day+(2*t)
    
    cond_1 = (h2_df["parsed_date"] > start_date_before) & (h2_df["parsed_date"] < end_date_before)
    cond_2 = (h2_df["parsed_date"] > start_date_after) & (h2_df["parsed_date"] < end_date_after)
    
    df_1 = h2_df.withColumn("count_before", when(cond_1, h2_df.groupBy("parsed_date").agg(count("id"))))
    df_2 = h2_df.withColumn("count_after", when(cond_2, h2_df.groupBy("parsed_date").agg(count("id"))))
    

I want a function where i can pass any date and then it gives me count of each id with respect to the date but the date should lie in the range only. so every time i call the function, it takes the date -> creates 2 ranges of the date -> creates 2 dataframes each with count of id for every date (and each range) -> returns 2 dataframes that has count of each id in that range.

for example: on calling hypo_2(2017,12,18) the function should return df_1 and df_2. the expected output of df_1 as shown as below:

+-------+-----------+------------+
|     id|parsed_date|count_before|
+-------+-----------+------------+
|1471783| 2017-12-18|           2|
|1471885| 2017-12-18|            |
|1472928| 2017-12-19|           1|
|1476917| 2017-12-21|           1|
|1477469| 2017-12-22|           3|
|1478190| 2017-12-22|            |
|1478570| 2017-12-22|            |
+-------+-----------+------------+

Please help.

mck
  • 40,932
  • 13
  • 35
  • 50
Samiksha
  • 139
  • 2
  • 9
  • Why is `count_before` 2 in the first row? Shouldn't it be 6 as all the rows are within 5 days? – mck Jan 25 '21 at 10:47
  • Hello @SameekshaSohal, this is the exact duplicate of the same question you already asked [here](https://stackoverflow.com/questions/65876339/count-value-while-iterate-column-values-inside-user-defined-function-in-pyspark) and it was closed for lack of clarity. Please take few moments to look at [How to make good reproducible Apache Spark examples](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples) and edit your question to make it more understandable for others. – blackbishop Jan 25 '21 at 11:21
  • @mck thank you for looking into it. the count_before is 2 in first row because there are 2 ids that we can see on 2017-12-18, 1 id on 2017-12-19 and so on. and this is for range 1 i.e (2017-12-18, 2017-12-23). i don't want the total count of id for the entire range. I just want count of id for each Date that is in the range. – Samiksha Jan 25 '21 at 11:38
  • @blackbishop thank you for sharing the reference. I am new to pyspark and stack overflow as well. I am trying to refine the query to my best. I am really hoping for some help. I hope this time the problem statement is comprehensive. thanks :) – Samiksha Jan 25 '21 at 11:51
  • @SameekshaSohal then why are the last two rows from the original dataframe not included? – mck Jan 25 '21 at 12:14
  • you are right. that should be included. @mck thanks for pointing out! – Samiksha Jan 25 '21 at 13:08

1 Answers1

1

You can use a filter to choose the time interval of interest, and add a column of the count for each parsed_date:

from pyspark.sql import functions as F, Window

def hypo_2(df, day, t):
    """
    Example usage: df_list = hypo_2(df, '2017-12-18', 5)
    Returns a list of 2 dataframes.
    """
    df1 = (df.filter(f"parsed_date between '{day}' and '{day}' + interval {t} days")
             .withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    df2 = (df.filter(f"parsed_date between '{day}' + interval {t} days and '{day}' + interval {t*2} days")
             .withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    return [df1, df2]
mck
  • 40,932
  • 13
  • 35
  • 50
  • also, if I do not have a single day but a list of days (along with df and t) that I want to pass through this function, is it possible ? or should i use a for loop inside the function? thanks in advance. – Samiksha Jan 25 '21 at 17:19
  • @SameekshaSohal I'm not sure if I understand what you're asking. please kindly open another question for that and provide all necessary details. Thanks :) – mck Jan 25 '21 at 17:26