16

According to the docs, the collect_set and collect_list functions should be available in Spark SQL. However, I cannot get it to work. I'm running Spark 1.6.0 using a Docker image.

I'm trying to do this in Scala:

import org.apache.spark.sql.functions._ 

df.groupBy("column1") 
  .agg(collect_set("column2")) 
  .show() 

And receive the following error at runtime:

Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function collect_set; 

Also tried it using pyspark, but it also fails. The docs state these functions are aliases of Hive UDAFs, but I can't figure out to enable these functions.

How to fix this? Thanx!

JFX
  • 432
  • 1
  • 4
  • 10

1 Answers1

33

Spark 2.0+:

SPARK-10605 introduced native collect_list and collect_set implementation. SparkSession with Hive support or HiveContext are no longer required.

Spark 2.0-SNAPSHOT (before 2016-05-03):

You have to enable Hive support for a given SparkSession:

In Scala:

val spark = SparkSession.builder
  .master("local")
  .appName("testing")
  .enableHiveSupport()  // <- enable Hive support.
  .getOrCreate()

In Python:

spark = (SparkSession.builder
    .enableHiveSupport()
    .getOrCreate())

Spark < 2.0:

To be able to use Hive UDFs (see https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF) you have use Spark built with Hive support (this is already covered when you use pre-built binaries what seems to be the case here) and initialize SparkContext using HiveContext.

In Scala:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext

val sqlContext: SQLContext = new HiveContext(sc) 

In Python:

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)
Thomas Decaux
  • 21,738
  • 2
  • 113
  • 124
zero323
  • 322,348
  • 103
  • 959
  • 935
  • and what about 1.6.1 in documents it sais that avialable "@since 1.6.0" but I'm still getting that error – Khachatur Stepanyan Mar 27 '17 at 15:56
  • Hey @zero323, I'm trying to use the 'collect_list' function in Spark 1.5.0. I've created the hive context, but can't figure out how to import the function. This doesn't compile: .groupBy(providerData("PRVSEQ"), providerData("PROV_NUM")) .agg(collect_list(regexp_replace(triggerReport("match_type"), "_(Individual|Practice)Model.", ""))) – nemo Mar 28 '17 at 20:07
  • @VijayRatnagiri It has been introduced in 1.6. As far as I remember you should be able to use raw SQL query in 1.5 on registered temporary table. – zero323 Mar 28 '17 at 21:06
  • 1
    @KhachaturStepanyan In 1.6 you still need Hive support. – zero323 Mar 28 '17 at 21:06