0

I am need to group row based value against each index from below data frame

+-----+------+------+------+------+-----+----+-------+
|index|amount| dept | date |  amount |dept   |date   |
+-----+-----------+-----+--+---------+---------+----+
|    1|1000  | acnt |2-4-21|  2000   | acnt2 |2-4-21 |
|    2|1500  | sales|2-3-21|  1600   | sales2|2-3-21 |       

since index stand unique to each row and date are same , i need to group the row values as below

+-----+------   +------------+-------+
|index|amount   | dept       | date  |
+-----+---------+------------+-------+
|    1|1000,2000|acnt,acnt2  |2-4-21 | 
|    2|1500,1600|sales,sales2|2-3-21 |       

i see many option to group columns but specifically for row based value in pyspark Is there any solution to populate the result as above?

cloud_hari
  • 147
  • 1
  • 8

3 Answers3

0

There are two ways.. deppending on what you want

from pyspark.sql.functions import struct, array, col

df = df.withColumn('amount', struct(col('amount1'),col('amount2')) # Map
df = df.withColumn('amount', array(col('amount1'),col('amount2')) # Array

if there are two columns with same name (like in your example), just recreate your df
(If is a join, there is no need... Just use alias)

cols = ['index','amount1','dept', 'amount2', 'dept2', 'date']
df = df.toDF(*cols)
SakuraFreak
  • 311
  • 1
  • 11
  • HI @Sakura, i am not sure how to use select expression dynamically from 1st approach since i need to pass columns dynamically.. with the second approach, it giving error says "the number of columns does not match" even i removed duplicate column names as you suggested – cloud_hari Mar 21 '22 at 07:32
  • edited the answer from expr to struct. And forgot to add asteristik in cols... – SakuraFreak Mar 21 '22 at 14:23
0

Ideally this needs to be fixed upstream (check if you have joins in your upstream codes and try to select only appropriate aliases to retain the unique columns only).

With that being said, you can create a helper spark function after creating a helper dictionary and column names:

from pyspark.sql import functions as F
from itertools import groupby

Create a fresh list with a counter:

l = []
s = {}
for i in df.columns:
    l.append(f"{i}_{s.get(i)}" if i in s else i)
    s[i] = s.get(i,0)+1
#['index', 'amount', 'dept', 'date', 'amount_1', 'dept_1', 'date_1']

Then with this new list create a dataframe with the existing dataframe and use a helper function to concat based on duplicate checks:

def mysparkfunc(cols):
    cols = [list(v) for k,v in groupby(sorted(cols),lambda x: x.split("_")[0])]
    return [F.concat_ws(",",*col).alias(col[0]) 
            if len(col)>1 and col[0]!= 'date' 
            else F.col(col[0]) for col in cols]

df.toDF(*l).select(*mysparkfunc(l)).show()

+---------+------+------------+-----+
|   amount|  date|        dept|index|
+---------+------+------------+-----+
|1000,2000|2-4-21|  acnt,acnt2|    1|
|1500,1600|2-3-21|sales,sales2|    2|
+---------+------+------------+-----+

Full Code:

from pyspark.sql import functions as F
from itertools import groupby

l = []
s = {}
for i in df.columns:
    l.append(f"{i}_{s.get(i)}" if i in s else i)
    s[i] = s.get(i,0)+1
def mysparkfunc(cols):
    cols = [list(v) for k,v in groupby(sorted(cols),lambda x: x.split("_")[0])]
    return [F.concat_ws(",",*col).alias(col[0]) 
            if len(col)>1 and col[0]!= 'date' 
            else F.col(col[0]) for col in cols]

df.toDF(*l).select(*mysparkfunc(l)).show()
anky
  • 74,114
  • 11
  • 41
  • 70
  • 1
    Good stuff mate! – Dipanjan Mallick Mar 21 '22 at 13:32
  • UDF is not recommended due performance. Pyspark makes things worse since doesn't work directly with JVM. – SakuraFreak Mar 21 '22 at 14:26
  • 1
    @SakuraFreak this is not a python UDF that I have used, but a function using builtin spark functions. More reading here: https://stackoverflow.com/questions/38296609/spark-functions-vs-udf-performance. Besides, this can only be dynamically done using loops and spark builtins (which my function does here) – anky Mar 21 '22 at 14:28
  • 1
    Oh now I understood. Nice to know about helpers. Didn't think about using like that. – SakuraFreak Mar 21 '22 at 14:37
0

let's say you have an initial data frame as shown below

INPUT:+------+------+------+------+
|  dept|  dept|amount|amount|
+------+------+------+------+
|sales1|sales2|     1|     1|
|sales1|sales2|     2|     2|
|sales1|sales2|     3|     3|
|sales1|sales2|     4|     4|
|sales1|sales2|     5|     5|
+------+------+------+------+
  1. Rename the columns:
newColumns = ["dept1","dept2","amount1","amount2"]    
new_clms_df = df.toDF(*newColumns)
new_clms_df.show()

    +------+------+-------+-------+
    | dept1| dept2|amount1|amount2|
    +------+------+-------+-------+
    |sales1|sales2|      1|      1|
    |sales1|sales2|      2|      2|
    |sales1|sales2|      3|      3|
    |sales1|sales2|      4|      4|
    |sales1|sales2|      5|      5|
    +------+------+-------+-------+
  1. Derive the final output columns:
final_df = None           
final_df = new_clms_df.\
            withColumn('dept', concat_ws(',',new_clms_df['dept1'],new_clms_df['dept2'])).\
            withColumn('amount', concat_ws(',',new_clms_df['amount1'],new_clms_df['amount2']))
final_df.show()


+------+------+-------+-------+-------------+------+
| dept1| dept2|amount1|amount2|         dept|amount|
+------+------+-------+-------+-------------+------+
|sales1|sales2|      1|      1|sales1,sales2|   1,1|
|sales1|sales2|      2|      2|sales1,sales2|   2,2|
|sales1|sales2|      3|      3|sales1,sales2|   3,3|
|sales1|sales2|      4|      4|sales1,sales2|   4,4|
|sales1|sales2|      5|      5|sales1,sales2|   5,5|

+------+------+-------+-------+-------------+------+
Bhanu Tej P
  • 220
  • 1
  • 5