13

I have been trying to create Spark Dataset using case classes that contain Enums but I'm not able to. I'm using Spark version 1.6.0. The exceptions is complaining about that there are no encoder found for my Enum. Is this not possible in Spark, to have enums in the data?

Code:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object MyEnum extends Enumeration {
  type MyEnum = Value
  val Hello, World = Value
}

case class MyData(field: String, other: MyEnum.Value)

object EnumTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val sqlCtx = new SQLContext(sc)

    import sqlCtx.implicits._

    val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS()

    println(s"df: ${df.collect().mkString(",")}}")
  }

}

Error:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.company.MyEnum.Value
- field (class: "scala.Enumeration.Value", name: "other")
- root class: "com.company.MyData"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:597)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502)
at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)
at com.company.EnumTest$.main(EnumTest.scala:22)
at com.company.EnumTest.main(EnumTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
ErikHabanero
  • 355
  • 3
  • 9

1 Answers1

13

You can create your own encoder:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object MyEnum extends Enumeration {
  type MyEnum = Value
  val Hello, World = Value
}

case class MyData(field: String, other: MyEnum.Value)

object MyDataEncoders {
  implicit def myDataEncoder: org.apache.spark.sql.Encoder[MyData] =
    org.apache.spark.sql.Encoders.kryo[MyData]
}  

object EnumTest {
  import MyDataEncoders._

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val sqlCtx = new SQLContext(sc)

    import sqlCtx.implicits._

    val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS()

    println(s"df: ${df.collect().mkString(",")}}")
  }
}
Carlos Vilchez
  • 2,774
  • 28
  • 30
  • Thanks! What if I want to do toDF() instead of toDS()? Then I get the following error: Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type com.nordea.gpdw.dq.MyEnum.Value is not supported – ErikHabanero Sep 26 '16 at 12:32
  • Are you using exactly the same code I used in my answer? I have tried changing `toDS` to `toDF` and it seems to work. – Carlos Vilchez Sep 27 '16 at 08:55
  • Yes I am, are you sure it prints out df: MyData(hello,World)} on stdout? As there is a lot of log output. – ErikHabanero Sep 27 '16 at 09:34
  • I can see the bytes for the object, but I am afraid that the `Encoder` functionality works properly only for the new `Dataset` type. – Carlos Vilchez Sep 27 '16 at 13:35
  • Ok so you can't really get this to work properly in Spark 1.6.0? Does it work in Spark 2? – ErikHabanero Sep 27 '16 at 16:02
  • The problem is that you have to use `Dataset` to make it work, and you can use it in Spark 1.6.0. But if you need to use `DataFrame`s, I would create a new question. – Carlos Vilchez Sep 28 '16 at 09:01
  • Ok I've created a new question: http://stackoverflow.com/questions/39746735/how-to-create-spark-dataframe-from-case-classes-that-contains-enums – ErikHabanero Sep 28 '16 at 11:55
  • it seems like it is not needed anymore in spark 3.* – soMuchToLearnAndShare Aug 22 '23 at 08:36