2

I am using Python 2.6.6 and Spark 1.6.0. I have df like this:

id | name      |  number |
-------------------------- 
1  | joe       | 148590  |
2  | bob       | 148590  |
2  | steve     | 279109  |
3  | sue       | 382901  |
3  | linda     | 148590  |

Whenever I try to run something like df2 = df.groupBy('id','length','type').pivot('id').agg(collect_list('name')), I get the following error pyspark.sql.utils.AnalysisException: u'undefined function collect_list;' Why is this?

I have also tried: hive_context = HiveContext(sc) df2 = df.groupBy('id','length','type').pivot('id').agg(hive_context.collect_list('name')) and get the error:

AttributeError: 'HiveContext' object has no attribute 'collect_list'

formicaman
  • 1,317
  • 3
  • 16
  • 32

1 Answers1

2

Here collect_list looks like a user-defined function. PySpark API only supports a handful of predefined functions like sum, count etc

If you are referring to any other code, please ensure you have the collect_list function defined somewhere. To import the collectivist function add below line in the top

from pyspark.sql import functions as F

And then change your code as:

 df.groupBy('id','length','type').pivot('id').agg(F.collect_list(name))

If you have it already defined, try below snippet.

df.groupBy('id','length','type').pivot('id').agg({'name':'collect_list'})
sam
  • 2,263
  • 22
  • 34
  • How to you define it? I don't think mine is. – formicaman Jul 01 '20 at 18:49
  • check my updated answer. Here is the useful link https://stackoverflow.com/questions/41026178/custom-aggregation-on-pyspark-dataframes – sam Jul 01 '20 at 18:55
  • I get the error: `Aggregate expression required for pivot, found 'pythonUDF#33';` – formicaman Jul 01 '20 at 18:57
  • that seems to be a different error. Which also means your collecti_list related errors solved. For the new error, I believe you need to change the order of agg and pivot. df.groupBy('id','length','type').agg({'name':'collect_list'}).pivot('id') – sam Jul 01 '20 at 18:59
  • Also, can you mark this question as answered since it only regards to collect_list – sam Jul 01 '20 at 19:01
  • Marked as answered. But when I try what you suggested: `pyspark.sql.utils.AnalysisException: u"expression 'pythonUDF' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;"` – formicaman Jul 01 '20 at 19:15
  • 1
    Ok.. looks like pivot should be first. So that is not the issue. Maybe you can post a new question with this pivot related error. – sam Jul 01 '20 at 19:27
  • thanks. see https://stackoverflow.com/questions/62684123/pyspark-aggregate-expression-required-for-pivot-found-pythonudf – formicaman Jul 01 '20 at 19:31