1

I have delivery data. I want to sum total ordered net_wt based on cust_id. In most of cases its easy as I can sum everything by grouping cust_id. However, I need to keep delivery date as well. But I got in to trouble as some orders have multiple deliveries date and then I am not sure how can I keep at least latest order date combined with sum of all orders irrespective of the delivery date.

cust_id     date_delivery   log_type    net_wt
4776210      3/4/2021         Bulk      3880
4776210      3/4/2021         Bulk      6160
4787563      3/20/2021        Bulk      10360
4787563      3/20/2021        Bulk      3800
4787563      3/20/2021        Bulk      5020
4787563      3/20/2021        Bulk      2120
4787563      3/25/2021        Bulk      2100
4787563      3/25/2021        Bulk      2140
4792002      3/27/2021        Bulk      9042
4790494      3/25/2021        Bulk      3718
4790494      3/25/2021        Bulk      8102

required output

    cust_id     date_delivery   log_type    total_order
   4776210       3/4/2021         Bulk      10040
   4787563       ????????         Bulk      25540
   4790494       3/25/2021        Bulk      11820

I have tried

df.createOrReplaceTempView('df')
df_test = spark.sql("""
SELECT cust_id, date_delivery,
       SUM(net_wt) AS `total_order`
FROM df
GROUP BY 1
""")
display(df_test)

but its not working. I want one cust_id with atleast 1 corresponding date(could be most recent ndelivery_date).

any help would be apprciated

Thanks in advance

user3459293
  • 320
  • 1
  • 3
  • 11

2 Answers2

1

You can apply an aggregation function (like "max" in this case) also to a date column if it is specified as actual date and not as string. For this reason, we are going to convert it to date type with to_date.

import pyspark.sql.functions as F

# run the following command only if you have Spark>=3.0, see:
# https://stackoverflow.com/questions/62943941/to-date-fails-to-parse-date-in-spark-3-0
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

# convert string type to actual date
df = df.withColumn('date_delivery', F.to_date('date_delivery', 'MM/dd/yyyy'))

# get aggregations
df_new = df\
  .groupby('cust_id')\
  .agg({
    'date_delivery': 'max',
    'log_type': 'max',
    'net_wt': 'sum'
  })

# if you need to rename the columns
df_new = df_new.toDF('cust_id', 'log_type', 'net_wf', 'date_delivery')

df_new.show()

+-------+--------+------+-------------+
|cust_id|log_type|net_wf|date_delivery|
+-------+--------+------+-------------+
|4776210|    Bulk| 10040|   2021-03-04|
|4787563|    Bulk| 25540|   2021-03-25|
|4790494|    Bulk| 11820|   2021-03-25|
|4792002|    Bulk|  9042|   2021-03-27|
+-------+--------+------+-------------+
Ric S
  • 9,073
  • 3
  • 25
  • 51
0

Just add max on date

>>> df.show()
+-------+---------+----+----+
|     id|       dt| log|  wt|
+-------+---------+----+----+
|4776210| 3/4/2021|Bulk| 400|
|4776210|3/14/2021|Bulk|1400|
|4787563|3/24/2021|Bulk| 200|
|4787563|3/14/2021|Bulk|4400|
|4787563| 3/4/2021|Bulk| 500|
+-------+---------+----+----+

>>> df.createOrReplaceTempView('df')
>>> spark.sql('''select id, max(to_date(dt,'M/d/yyyy')) as dt,log,sum(wt) as wt from df group by id,log''').show()
+-------+----------+----+------+
|     id|        dt| log|    wt|
+-------+----------+----+------+
|4776210|2021-03-14|Bulk|1800.0|
|4787563|2021-03-24|Bulk|5100.0|
+-------+----------+----+------+
Bala
  • 11,068
  • 19
  • 67
  • 120