2

So I have a dataframe and I want to calculation some quantity let's say in daily basis..let's say we have 10 columns col1,col2,col3,col4... coln which each columns are dependent on value col1, col2, col3 , col4.. and so on and the date resets based on the id..

    +--------+----+----              +----+
        date |col1|id  |col2|.    .  |coln
    +--------+----+----              +----+
    2020-08-01| 0|  M1 |   .    .   .    3|
    2020-08-02| 4|  M1 |                10|
    2020-08-03| 3|  M1 |   .     .   .  9 |
    2020-08-04| 2|  M1 |    .   .    .  8 |
    2020-08-05| 1|  M1 |   .   .     .  7 |
    2020-08-06| 0|  M1 |   .    .   .   0 |
    2020-08-01| 0|  M2 |   .   .     .  0 |
    2020-08-02| 0|  M2 |    .   .   . . 1 |
    2020-08-03| 0|  M2 |    .   .  . .  2 |
   +---------+----+----+-----------------+   

Let's say we execute this dataframe, there could be alot more columns in this df... So to make this clear, let's say today's date is 2020-08-01. and we do some calculation and we got some output at coln let's say coln =3 at 2020-08-01, and I want to coln == col1 at 2020-08-02 which is col1 ==3 and carry on the calculation at 2020-08-02 and so on... so example of df looks like this below

    +--------+----+----              +----+
        date |col1|id  |col2|.    .  |coln
    +--------+----+----              +----+
    2020-08-01| 0|  M1 |   .    .   .    3|
    2020-08-02| 3|  M1 |                10|
    2020-08-03|10|  M1 |   .     .   .  9 |
    2020-08-04| 9|  M1 |    .   .    .  8 |
    2020-08-05| 8|  M1 |   .   .     .  7 |
    2020-08-06| 7|  M1 |   .    .   .   0 |
    2020-08-01| 0|  M2 |   .   .     .  1 |
    2020-08-02| 1|  M2 |    .   .   . . 2 |
    2020-08-03| 2|  M2 |    .   .  . .  0 |
   +---------+----+----+-----------------+   
  

It would be great if you guys can give me an example how this can be done in pyspark..

example: let's say col3 = col1+ col2 and initally, let's say col1 is all 0.

df1_schema = StructType([StructField("Date", StringType(), True),\
                              StructField("col1", IntegerType(), True),\
                             StructField("id", StringType(), True),\
                       StructField("col2", IntegerType(), True),\
                       StructField("col3", IntegerType(), True),\
                        StructField("coln", IntegerType(), True)])
df_data = [('2020-08-01',0,'M1',3,3,2),('2020-08-02',0,'M1',2,3,1),\
           ('2020-08-03',0,'M1',3,3,3),('2020-08-04',0,'M1',3,3,1),\
            ('2020-08-01',0,'M2',1,3,1),('2020-08-02',0,'M2',-1,3,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()

+----------+----+---+----+----+----+
|      Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01|   0| M1|   3|   3|   2|
|2020-08-02|   0| M1|   2|   3|   1|
|2020-08-03|   0| M1|   3|   3|   3|
|2020-08-04|   0| M1|   3|   3|   1|
|2020-08-01|   0| M2|   1|   3|   1|
|2020-08-02|   0| M2|  -1|   3|   2|
+----------+----+---+----+----+----+

So Let's focus on 2020-08-01 which is the beginning, and what we want is col1+col2 which is 3 = col3. and after nth calculation that is dependent on col3.. col4... col5.. let's say we got to some number coln= 3. after that calculation is done, we want at 2020-08-02, that coln=3 should be at col1 so it's a dynamically changing after 2020-08-01 calculation is complete

enter image description here

so my desired df would look like this

+----------+----+---+----+----+----+
|      Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01|   0| M1|   3|   3|   2|
|2020-08-02|   2| M1|   2|   5|   1|
|2020-08-03|   1| M1|   3|   4|   3|
|2020-08-04|   3| M1|   3|   6|   1|
|2020-08-01|   1| M2|   1|   4|   1|
|2020-08-02|   1| M2|  -1|   0|   2|
+----------+----+---+----+----+----+

EDIT 2:

df1_schema = StructType([StructField("Date", StringType(), True),\
                              StructField("col1", IntegerType(), True),\
                             StructField("id", StringType(), True),\
                       StructField("col2", IntegerType(), True),\
                       StructField("col3", IntegerType(), True),\
                       StructField("col4", IntegerType(), True),\
                        StructField("coln", IntegerType(), True)])
df_data = [('2020-08-01',0,'M1',3,3,2,2),('2020-08-02',0,'M1',2,3,0,1),\
           ('2020-08-03',0,'M1',3,3,2,3),('2020-08-04',0,'M1',3,3,2,1),\
            ('2020-08-01',0,'M2',1,3,3,1),('2020-08-02',0,'M2',-1,3,1,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()
+----------+----+---+----+----+----+----+
|      Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01|   0| M1|   3|   3|   2|   2|
|2020-08-02|   0| M1|   2|   3|   0|   1|
|2020-08-03|   0| M1|   3|   3|   2|   3|
|2020-08-04|   0| M1|   3|   3|   2|   1|
|2020-08-01|   0| M2|   1|   3|   3|   1|
|2020-08-02|   0| M2|  -1|   3|   1|   2|
+----------+----+---+----+----+----+----+

so let's say coln = col4 - col2 then

+----------+----+---+----+----+----+----+
|      Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01|   0| M1|   3|   3|   2|  -1|
|2020-08-02|  -1| M1|   2|   1|   0|  -2|
|2020-08-03|  -2| M1|   3|   1|   2|  -1|
|2020-08-04|  -1| M1|   3|   2|   2|  -1|
|2020-08-01|   0| M2|   1|   1|   3|   2|
|2020-08-02|   2| M2|  -1|   1|   1|   2|
+----------+----+---+----+----+----+----+
hellotherebj
  • 121
  • 8
  • from your diagram for id=M1, it looks you just need a Window funcion to calculate col1 from lag('coln'), and then calculate col3: for example let set `w1 = Window.partitionBy('id').orderBy('Date')`, then do `df1.withColumn('col1', F.coalesce(F.lag('coln').over(w1),F.col('col1'))).withColumn('col3', F.col('col1') + F.col('col2')).show()`. but this does not match your desired df, which one is correct? – jxc Sep 30 '20 at 21:04
  • ah yes yours are correct. the starting date 2020-08-01 at col1 should be 0 – hellotherebj Sep 30 '20 at 21:11
  • how about values in col3? – jxc Sep 30 '20 at 21:14
  • yes you are correct. but when we are calculating 2020-08-01, the value of coln at 2020-08-02 would be blank/unknown because it did not do the calculation yet. you know what i mean? so let's say I have a script. so at line 50 that does the calculation of col3. and after all the col4,col5,col6 calculation let's say at line 1000, we have calculation for coln. I want to bring that coln value to col1 again like going back to the start and do the calculation for 2020-08-02 and so on. – hellotherebj Sep 30 '20 at 21:19
  • is that even possible? – hellotherebj Sep 30 '20 at 21:19
  • so you want to use the updated value of `coln` on `2020-08-01` to calculate the `col1` on `2020-08-02`? if so, it's possible to handle this using aggregate function, if you can add an example, for example coln = `col2 - col4` etc and update your diagram for desired result. you might have to pack related calculations all in one function though. – jxc Sep 30 '20 at 21:33
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/222333/discussion-between-hellotherebj-and-jxc). – hellotherebj Sep 30 '20 at 22:37

1 Answers1

2

This is one type of questions you can handle with SparkSQL builtin function aggregate (require Spark 2.4+), below outlines the basic idea:

from pyspark.sql.functions import sort_array, collect_list, struct, to_date

cols = ['Date', 'col1', 'col2', 'col3', 'coln']

df_new = df1.groupby('id') \
    .agg(sort_array(collect_list(struct(*cols))).alias('dta')) \
    .selectExpr("id", """  
      inline( 
        aggregate( 
          /* expr: iterate through the array `dta` from the 2nd to the last items*/
          slice(dta,2,size(dta)-1), 
          /* start: AKA. the zero value which is an array of structs 
           * with a single element dta[0]
           */
          array(dta[0]), 
          /* merge: do the calculations */
          (acc, x) ->   
            concat(acc, array(named_struct( 
              'Date', x.Date, 
              'col1', element_at(acc, -1).coln, 
              'col2', x.col2, 
              'col3', element_at(acc, -1).col3 + x.col2, 
              'coln', x.col3 - x.col2 
            )))  
         )    
       )    
   """)

Output:

df_new.show()
+---+----------+----+----+----+----+ 
| id|      Date|col1|col2|col3|coln|
+---+----------+----+----+----+----+
| M1|2020-08-01|   0|   3|   3|   2|
| M1|2020-08-02|   2|   2|   5|   1|
| M1|2020-08-03|   1|   3|   8|   0|
| M1|2020-08-04|   0|   3|  11|   0|
| M2|2020-08-01|   0|   1|   3|   1|
| M2|2020-08-02|   1|  -1|   2|   4|
+---+----------+----+----+----+----+

Where:

  1. we groupby rows for the same id and sort them by Date, name the resulting array of structs as dta

  2. in the aggregate function, we initialize acc with an array of structs array(dta[0]) and then iterate through the array dta from the 2nd item to the last item using slice function

  3. in the merge part of the aggregate function, you can use x.col1, x.coln etc to refer to values on the same Date and use element_at(acc, -1).col1, element_at(acc, -1).coln etc to refer the values from the previous Date.

  4. in the merge function, we use concat(acc, array(...)) to append a new element to the array of structs acc

  5. use inline function to explode the above array of structs acc

  6. this assumed Dates are continuous, if missing date exists, you can add some IF conditions. for example to calculate col3 below:

    IF(datediff(x.Date, element_at(acc, -1).Date) = 1, element_at(acc, -1).coln, 0) + x.col2
    

BTW. I did not use the example coln = col4 - col2, using con3 = col3_prev + col2 instead, I think, is a better example.

jxc
  • 13,553
  • 4
  • 16
  • 34
  • could you have look at this problem jxc? https://stackoverflow.com/questions/64177603/how-to-loop-dataframe-in-pyspark – hellotherebj Oct 02 '20 at 20:04
  • hi, @hellotherebj, is the new question related to this post, I think you can just use a similar way here but groupby('Date'), sort the collect_list by `id` (desc or asc). – jxc Oct 02 '20 at 21:41
  • hi jxc, for this problem, what if I want to lag `col2` (any window functions) and use that to get coln? is it possible to lag in array form in general? – hellotherebj Oct 15 '20 at 22:27
  • @hellotherebj, use `element_at(acc, -1).col2` if you want the col2 which is just updated in the previous iteration. – jxc Oct 15 '20 at 22:30
  • can we implement https://stackoverflow.com/questions/63290611/pyspark-how-to-code-complicated-dataframe-calculation this complicated calculation here somehow? – hellotherebj Oct 29 '20 at 17:25
  • hi, @hellotherebj. if you are looking to resolve a big time series when the related rows have to be distributed on multiple partitions. there is probably no easy way to handle in Spark. this code is only logically working for a dataset which can be easily loaded into memory. BTW, the first argument of aggregate function could be `slice(dta,2,size(dta))`, just skip the `minus 1` to the length of the slice which yield ERROR for EMPTY arrays. – jxc Oct 29 '20 at 18:45
  • Thank you jxc, all I want is build a logic like this one that does it in recursive, like referring coln = col1 for tomorrow.. and all other columns are depended on that col1 value.. what would be some options?, one option is to change the df to the array (pandas df) ??.. any advice? but I am not sure how to implement it.. – hellotherebj Oct 29 '20 at 19:13