2

I am using Spark with Java 8. I have a dataframe where one of the columns contains a single mllib.linalg.Vector. I want to groupBy one of the other columns in the dataframe, say an ID column, and "collect_list" the feature vectors into a list. I'm getting the error below. I don't understand why. This is a generic operation, why does it care about the type of the data in the column? it works fine for scalar numbers, or strings, etc, but does not seem to work for mllib Vector. Is there a workaround this?, maybe another function other than collect_list()?

No handler for Hive udf class org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: org.apache.spark.mllib.linalg.VectorUDT@f71b0bce (of class org.apache.spark.mllib.linalg.VectorUDT)
Kai
  • 1,464
  • 4
  • 18
  • 31

1 Answers1

1

What is the Spark version you are using? With Spark 1.6.2, it throws same error you have mentioned but this is working fine with Spark 2.0.1. See the sample code and output below.

public class JavaVectorExample {
    public static void main(String[] args) {
    //SparkSession
    SparkSession spark = SparkSession
      .builder()
      .appName("JavaVectorExample")
      .master("local[2]")
      .getOrCreate();
    //schema
    StructType schema = createStructType(new StructField[]{
      createStructField("id", IntegerType, false),
      createStructField("label", DoubleType, false),
      createStructField("features", new VectorUDT(), false),
    });
    //dataset
    Row row1 = RowFactory.create(0, 1.0, Vectors.dense(0.0, 10.0, 0.5));
    Row row2 = RowFactory.create(1, 1.0, Vectors.dense(1.0, 10.5, 0.5));
    Row row3 = RowFactory.create(0, 1.5, Vectors.dense(0.0, 10.5, 1.0));
    Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row1,row2,row3), schema);
    dataset.printSchema();
    //groupby
    dataset.groupBy(col("id")).agg(collect_list(col("features"))).show(false);
    spark.stop();
  }
}

Here is the output.

+---+--------------------------------+
|id |collect_list(features)          |
+---+--------------------------------+
|1  |[[1.0,10.5,0.5]]                |
|0  |[[0.0,10.0,0.5], [0.0,10.5,1.0]]|
+---+--------------------------------+
abaghel
  • 14,783
  • 2
  • 50
  • 66
  • Thank you. Sadly, I am using/have to use 1.6.1. So, we gather that this is a bug in 1.6.x OR a new feature in 2.0. (in my case I was using SparceVector, but I don't think it matters, I'll check though. Thank you for the full code. – Kai Jan 12 '17 at 14:05
  • see this: http://stackoverflow.com/questions/35528966/use-collect-list-and-collect-set-in-spark-sql. Seems Spark 2.0 introduced a native collect_list/set() that does not use the Hive udf collect_list/set and it seems that they enabled the native spark versions to be [hopefully more] generic and able to handle non-primitive types. Hive version handles primitive, Struct, List and Map types by looking here: https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectList.java. – Kai Jan 12 '17 at 21:06
  • @kai I have seen that SO post. In Spark 1.6.2 if you try to use collect_list on Vector using SQLContext sqlContext = new SQLContext(sc); It will throw "undefined function collect_list" error. If you use SQLContext sqlContext = new HiveContext(sc); it will throw the exact error as you have mentioned in your question. So only way to get this work is use Spark 2.0 version. – abaghel Jan 13 '17 at 03:10
  • yes, that's what it looks like. But I may be able to convert to a Map first then collect those since hive allows Maps. – Kai Jan 13 '17 at 13:57