0

I am new to programming and am cleaning up and simplifying my code to perform groupby and aggregation on a pyspark dataframe. I am trying to make things easier to follow and have been working on refactoring some of my code. When I try the following code, I get an error:

TypeError: Invalid argument, not a string or column: 

Here is my code:

groupBy = ['ColA']

convert_to_list = ['Col1', 'Col2', 'Col3',]
convert_to_set = ['Col4', 'Col5', 'Col6',]

fun_list = [F.collect_list]
funs_set = [F.collect_set]

exprs = F.concat(
    [f(F.col(c)) for f in fun_list for c in convert_to_list], 
    [f(F.col(c)) for f in funs_set for c in convert_to_set]
)

df = df.groupby(*groupBy).agg(*exprs)

Really appreciate your help. I am unsure how to pass the right columns to the agg function.

Sample input and expected output

enter image description here

an

pault
  • 41,343
  • 15
  • 107
  • 149
Rob
  • 468
  • 3
  • 15
  • 1
    Welcome to Stackoverflow. Could you please provide a sample Input data and expected output besides your code? It helps users solving your issue. Thanks. – C.S.Reddy Gadipally Jun 27 '19 at 15:42
  • 1
    The problem is almost certainly in the call to `concat` - you're passing in 2 lists, when it excepts columns. Try `exprs = [f(F.col(c)) for f in fun_list for c in convert_to_list] + [f(F.col(c)) for f in funs_set for c in convert_to_set]` instead. Or maybe you want `exprs = concat(*([f(F.col(c)) for f in fun_list for c in convert_to_list] + [f(F.col(c)) for f in funs_set for c in convert_to_set]))`. Hard to tell without a [mcve]. – pault Jun 27 '19 at 15:49
  • @pault, The first method worked! The reasoning makes perfect sense too. Thank you so much. Really appreciate it, mate. – Rob Jun 27 '19 at 16:04

1 Answers1

1

Your problem is the call to concat - you're passing in 2 lists, when it expects the arguments to be columns. It seems that you wanted to concatenate these lists together, which can be done using the addition operator in python.

Try:

exprs = (
    [f(F.col(c)).alias(c) for f in fun_list for c in convert_to_list] + 
    [f(F.col(c)).alias(c) for f in funs_set for c in convert_to_set]
)

I also added in a call to alias so the column names are maintained after the aggregation.

pault
  • 41,343
  • 15
  • 107
  • 149
  • This works great, thanks. Now I tried to convert this to a UDF, and I get an error: ``` @F.udf def aggregation(df, groupby_column, cols_to_list, cols_to_set): fun_list = [F.collect_list] funs_set = [F.collect_set] exprs = [F.collect_set(F.col(c)).alias(c) for c in cols_to_list]\ + [F.collect_set(F.col(c)).alias(c) in funs_set for c in cols_to_set] return df.groupby(*groupby_column).agg(*exprs) ``` ``` TypeError: Invalid argument, not a string or column: DataFrame ``` Might you know what might be happening here? – Rob Jun 27 '19 at 17:02
  • It's hard to interpret code in a comment - try posting a new question, but be sure to include a [reproducible example](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples) and [don't post pictures of code or data](https://meta.stackoverflow.com/questions/285551/why-not-upload-images-of-code-on-so-when-asking-a-question). – pault Jun 27 '19 at 17:14
  • Done. If you get an opportunity, I would really appreciate your help, Paul. Link: https://stackoverflow.com/questions/56796231/how-to-convert-a-group-and-agg-code-on-a-dataframe-to-a-udf-in-pyspark – Rob Jun 27 '19 at 17:45