I am fairly new to PySpark and looking for the best way to perform the following calculations: I have the following data frame:
+-------------+------------+--------------+------------+------------+-----+
|invoice_month|invoice_year|start_date_key|end_date_key|invoice_days| cost|
+-------------+------------+--------------+------------+------------+-----+
| 11| 2007| 20071022| 20071120| 30| 100|
| 12| 2007| 20071121| 20071220| 30| 160|
| 5| 2014| 20140423| 20140522| 30| 600|
| 5| 2005| 20050503| 20050602| 31| 470|
| 7| 2012| 20120702| 20120801| 31| 200|
| 7| 2013| 20130712| 20130812| 32| 300|
| 2| 2010| 20100212| 20100316| 33| 640|
| 12| 2013| 20130619| 20130828| 71| 820|
+-------------+------------+--------------+------------+------------+-----+
What I am trying to calculate is the calendarized cost by invoice month and year. For example, the first invoice spans across 2 months (October & November), the prorated cost for the first invoice of November should be 20/30 * 100 = 66.67. Then the prorated cost for the second invoice of November should be 10/30 (from 11-21 to 11-30) * 160 = 53.33. So the calendarized cost of the invoice for November 2007 should be 66.67 + 53.33 = 120.
My initial thought was to use a brute force approach, create a separate data frame and to go through the unique tuples of (invoice month, invoice year) row by row, join back to this original data frame select all the invoices that falls within range based on start_date_key
and end_date_key
and calculate for each. The calculation would be even more tricky when there's an invoice that spans more than 2 months like the last invoice. Would that be a way to expand the existing data frame and create additional weighted columns based on start_date_key
and end_date_key
, for example, I would create 201306, 201307, 201308 columns for the last invoice such that I can calculate the weighted cost for each and perform an aggregate.
I am not sure if there is a more efficient way of doing it; any hints would be much appreciated!