1

I am trying to create a Cassandra UDT from some columns from my data frame. I want to add this UDT column to the data frame and save this to the Cassandra table. My code looks like:

val asUDT = udf((keys: Seq[String], values: Seq[String]) =>
   UDTValue.fromMap(keys.zip(values).filter {
      case (k, null) => false
      case _ => true
    }.toMap))

val keys = array(mapKeys.map(lit): _*)
val values = array(mapValues.map(col): _*)
return df.withColumn("targetColumn", (asUDT(keys, values))

However, I am receiving the following exception: Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type AnyRef is not supported.

Please let me know how I can save a UDT value as a column in my data frame. Any pointers on how I can get this to work will be really helpful.

Thanks

Jds
  • 155
  • 1
  • 2
  • 8
  • To be able to insert custom class into `DataFrame` you need proper annotation on the class and Spark UDT which describes how to serialize and deserialize. – zero323 Oct 23 '15 at 16:13
  • Possible duplicate of [How to define schema for custom type in Spark SQL?](http://stackoverflow.com/questions/32440461/how-to-define-schema-for-custom-type-in-spark-sql) – zero323 Oct 23 '15 at 16:13
  • I saw that one but that needs me to define structure for the dataframe upfront, doesnt it? I was hoping I could just create a custom column and add it to the dataframe. – Jds Oct 23 '15 at 16:28
  • No, you can return custom type from udf. – zero323 Oct 23 '15 at 16:30
  • Oh alright.. Would you be able to explain how I can do that. I am not quite familiar with this yet.. – Jds Oct 23 '15 at 16:33
  • Truth be told I have no idea what is `UDTValue` and how it is represented :) Could you provide some details? What kind of data do you keep there? – zero323 Oct 23 '15 at 16:38
  • Spark Cassandra connector exposes UDTValue to de/serialize the data. I have different types of UDTs, the simplest one looks like this: `case class metric(value: Float, ccy: String, bill_no: String)` – Jds Oct 23 '15 at 16:44
  • IMHO if you have something that can be easily represented as a case class with supported fields it is better to use it directly as StructType. There is no value in UDT and every operation you'll try to perform on that will require quite a lot of wrangling not to mention will be more expensive than using native types. – zero323 Oct 23 '15 at 16:53
  • Ok.. but how do I do it. :( – Jds Oct 23 '15 at 16:57
  • Oh, case classes, including nested are supported out of the box. Try `case class Record(id: Long, metric: metric); sc.parallelize(Seq(Record(1L, metric(1.0f, "foo", "bar")))).toDF` :) – zero323 Oct 23 '15 at 17:03

0 Answers0