0

I have a given dataframe that looks like this:

 TEST_schema = StructType([StructField("Date", StringType(), True),\
                          StructField("START", StringType(), True),\
                          StructField("quantity", IntegerType(), True),\
                          StructField("col1", StringType(), True),
                         StructField("col2", StringType(), True)])
TEST_data = [('2020-08-15','2020-08-19',1,'2020-08-05','2020-08-09'),('2020-08-16','2020-08-19',2,'2020-08-05','2020-08-09')\
             ,('2020-08-17','2020-08-19',3,'2020-08-06','2020-08-09'),\
             ('2020-08-18','2020-08-19',4,'2020-08-10','2020-08-11'),('2020-08-19','2020-08-19',5,'2020-08-16','2020-08-19'),\
             ('2020-08-20','2020-08-19',6,'2020-08-20','2020-08-25'),('2020-08-21','2020-08-19',7,'2020-08-20','2020-08-21'),\
             ('2020-08-22','2020-08-19',8,'2020-08-19','2020-08-24'),('2020-08-23','2020-08-19',9,'2020-08-05','2020-08-09')]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data, TEST_schema)
TEST_df = TEST_df.withColumn("Date",to_date("Date"))\
             .withColumn("START",to_date("START"))\
             .withColumn("col1",to_date("col1"))\
             .withColumn("col2",to_date("col2"))\
           
TEST_df.show()

+----------+----------+--------+----------+----------+
|      Date|     START|quantity|      col1|      col2|
+----------+----------+--------+----------+----------+
|2020-08-15|2020-08-19|       1|2020-08-05|2020-08-09|
|2020-08-16|2020-08-19|       2|2020-08-05|2020-08-09|
|2020-08-17|2020-08-19|       3|2020-08-06|2020-08-09|
|2020-08-18|2020-08-19|       4|2020-08-10|2020-08-11|
|2020-08-19|2020-08-19|       5|2020-08-16|2020-08-19|
|2020-08-20|2020-08-19|       6|2020-08-20|2020-08-25|
|2020-08-21|2020-08-19|       7|2020-08-20|2020-08-21|
|2020-08-22|2020-08-19|       8|2020-08-19|2020-08-24|
|2020-08-23|2020-08-19|       9|2020-08-05|2020-08-09|
+----------+----------+--------+----------+----------+

where col1 and col2 may have not be unique, and Date is just incremental date, and START is unique. My logic is that if START == col2, then lag(quantity, offset= datediff(col2,col1),0) otherwise 0. in this case, datediff(col2,col1) which is 3days. Attempt 1.

from pyspark.sql.functions import when, col,datediff,expr
TEST_df = TEST_df.withColumn('datedifff', datediff(col('col2'), col('col1')))\
                  .withColumn('want', expr("IF(START == col2, lag(quantity, datedifff,0),0)          "))

which has literal error...

so my result df will look like: enter image description here

hellotherebj
  • 121
  • 8
  • it looks the diagram does not match what you described in the post? also, can the calculated `datedifff` be negative? – jxc Aug 19 '20 at 03:31
  • uhm. we just care about when START == col2, and and take the date difference from col2 and col1, which is 3 days, we count 3 days below(and put 5 at the 4th days, which is at 2020-08-23 should have the quantity 5. sorry about the confusion the diagram should be the correct one – hellotherebj Aug 19 '20 at 03:35
  • `datediff` can not be negative. – hellotherebj Aug 19 '20 at 03:38
  • what if more than one rows satisfy `START == col2`, and some of them are pointed to the same Date after adding datedifff? – jxc Aug 19 '20 at 03:41
  • in that case we lag that as well. and yes you are right there might be more than one rows satisfy `START == col2 ` and I dont think it's possible that they will overlap. – hellotherebj Aug 19 '20 at 03:45
  • what if on Date = "2020-08-18", we adjust col1 = "2020-08-15" and col2 = "2020-08-19", so the `want` on Date = "2020-08-23" be 4 or 5? – jxc Aug 19 '20 at 03:49
  • i think it's impossible, i remember there must be "one" non-zero value in "want" column and others are all "zero".. and even if they do, they won't overlap. i can get back to you this tomorrow – hellotherebj Aug 19 '20 at 03:55
  • maybe you can try collecting all Rows with matched START=col2 into a new dataframe, calculate the new Date `df_match = TEST_df.filter("START = col2").withColumn('Date', expr("date_add(Date, datediff(col2, col1)+1)")).select('Date', 'quantity')`, then do a left join with the original TEST_df for example: `TEST_df.join(df_match.alias('dm'),'Date','left').withColumn('want', expr("coalesce(dm.quantity,0)")).show()` – jxc Aug 19 '20 at 04:12
  • ah i see... you left join by Date and maybe need to orderBy('Date').. awesome.. i guess lag() function won't work.. – hellotherebj Aug 19 '20 at 04:21
  • Could you have a look at this problem ?https://stackoverflow.com/questions/63494242/pyspark-how-to-calculate-poisson-distribution-using-udf – hellotherebj Aug 19 '20 at 22:42
  • I am using pythong APIs and use it on pyspark dataframe using udf method, I tried and it is keep failing :( – hellotherebj Aug 19 '20 at 22:42
  • I am not familiar with this module, if you have a working code with Pandas/Numpy/Scipy, most likely you can convert it into pandas_udf. – jxc Aug 19 '20 at 22:59
  • Thank you jxc for previous question, could you look at this new question? it's a timeseries question.. https://stackoverflow.com/questions/64144891/how-to-calculate-daily-basis-in-pyspark-dataframe-time-series – hellotherebj Sep 30 '20 at 20:01
  • Hi, hellotherebj, your new question is not very clear, it looks you want to change `coln` based on the values of `col1`, but in both dataframes, col1 values also change, can you make a diagram to outline the logic similar to those in your previous posts? BTW. I am not sure if I understood your previous question correctly, if it solves your question, can you upvote my answer? – jxc Sep 30 '20 at 20:23
  • I've updated with an example.. and upvoted.. maybe I need a second df that calculates this so like filter(date = 2020-08-01) and do the calculation , and join with the main df?? – hellotherebj Sep 30 '20 at 20:44
  • or loop the entire calculation by dates... – hellotherebj Sep 30 '20 at 20:47

0 Answers0