1

I am new to Spark and I need some help in applying condition based groupby function.Below is my current output

+----------+------------------+-----------------+----------+---------+------------+------+----------+--------+----------------+
|account_id|credit_card_Number|credit_card_limit|first_name|last_name|phone_number|amount|      date|    shop|transaction_code|
+----------+------------------+-----------------+----------+---------+------------+------+----------+--------+----------------+
|     12345|      123456789123|           100000|       abc|      xyz|  1234567890|  1000|01/06/2020|  amazon|             buy|
|     12345|      123456789123|           100000|       abc|      xyz|  1234567890|  1100|02/06/2020|    ebay|             buy|
|     12345|      123456789123|           100000|       abc|      xyz|  1234567890|   500|02/06/2020|  amazon|            sell|
|     12345|      123456789123|           100000|       abc|      xyz|  1234567890|   200|03/06/2020|flipkart|             buy|
|     12345|      123456789123|           100000|       abc|      xyz|  1234567890|  4000|04/06/2020|    ebay|             buy|
|     12345|      123456789123|           100000|       abc|      xyz|  1234567890|   900|05/06/2020|  amazon|             buy|
+----------+------------------+-----------------+----------+---------+------------+------+----------+--------+----------------+

I need to groupby using date and in addition to that I need to create a additonal column of Balance left for that date based upon 'buy' or 'sell' in transaction code.

For example, for the first row the amount is 1000 and transaction code is 'buy' so I subtract 1000 from the credit limit(100000) and create a new value of 90000 in a new column.

For the second row we have 2 values one of buy(1100) and another of sell(500), here I should subtract 1100 from the previous row output(i.e 90000)add 500 to that. So the output for 02/06/2020 is 98400

Expected output An additional column attached with the above dataframe

Credit_left
99000
98400
98200
94200
93300

Below is the schema of this table

root
 |-- account_id: long (nullable = true)
 |-- credit_card_Number: long (nullable = true)
 |-- credit_card_limit: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- phone_number: long (nullable = true)
 |-- amount: long (nullable = true)
 |-- date: string (nullable = true)
 |-- shop: string (nullable = true)
 |-- transaction_code: string (nullable = true)

This is such a complex task so I couldn't find the required answer for this.Please help me in solving this problem.Thanks a lot !

keerthi007
  • 223
  • 1
  • 13
  • Your output doesn't have the same number of rows as the original data frame, it can't be appended as a new column – Psidom Jun 30 '20 at 04:10
  • I need to apply groupby using date and while applying that groupby function I need to execute this condition.Since I have two similar dates,I will group them and so the final output will be of 5 rows – keerthi007 Jun 30 '20 at 04:12
  • @keerthi007 Can you give a reproducible data sample? – pissall Jun 30 '20 at 05:32
  • Can you explain what do you mean by reproducibe data sample ? Sorry I am a newbie to Spark .Thanks a lot ! – keerthi007 Jun 30 '20 at 05:34

1 Answers1

4

The solution can be implemented as

from pyspark.sql import Window
from pyspark.sql.functions import *
import pyspark.sql.functions as f

w = Window.orderBy('date')

df.groupBy('date','credit_card_limit','credit_card_Number').agg(f.sum(f.when(f.col('transaction_code')=='buy',-f.col('amount')).\
              otherwise(f.col('amount'))).alias('expenses')).\
    select('*',(f.col('credit_card_limit')+f.sum(f.col('expenses')).over(w)).alias('Credit_left')).show()

----------+-----------------+------------------+--------+-----------+
|      date|credit_card_limit|credit_card_Number|expenses|Credit_left|
+----------+-----------------+------------------+--------+-----------+
|01/06/2020|           100000|      123456789123| -1000.0|    99000.0|
|02/06/2020|           100000|      123456789123|  -600.0|    98400.0|
|03/06/2020|           100000|      123456789123|  -200.0|    98200.0|
|04/06/2020|           100000|      123456789123| -4000.0|    94200.0|
|05/06/2020|           100000|      123456789123|  -900.0|    93300.0|
+----------+-----------------+------------------+--------+-----------+

Hope it helps :)

keerthi007
  • 223
  • 1
  • 13
Shubham Jain
  • 5,327
  • 2
  • 15
  • 38
  • Getting error as ' NameError: name 'f' is not defined ' I added ' import pyspark.sql.functions as f ' to your solution – keerthi007 Jun 30 '20 at 06:09
  • Great solution ! Thanks a lot for your support :) – keerthi007 Jun 30 '20 at 06:12
  • I get a warning like this 'WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation' Is this a huge problem ? How to overcome that ? – keerthi007 Jun 30 '20 at 07:26
  • When you have data for multiple creditcards then you can partition on the creditcard number. since your data contains only one creditcard information, so it should not create any issue – Shubham Jain Jun 30 '20 at 07:31
  • I also get this warning when I apply this code for many users data where I have separate credit card info for each user. How to solve that ? – keerthi007 Jun 30 '20 at 07:34
  • 1
    That warning is just to remind that in case of skewness in data there will be uneven partitioning. So if your data is equally distributed amongst all partitions then you are good to go – Shubham Jain Jun 30 '20 at 07:41
  • 1
    Also refer this question https://stackoverflow.com/questions/41313488/avoid-performance-impact-of-a-single-partition-mode-in-spark-window-functions for more details – Shubham Jain Jun 30 '20 at 07:45