6

I have some data stored as parquet files and case classes matching the data schema. Spark deals well with regular Product types so if I have

case class A(s:String, i:Int)

I can easily do

spark.read.parquet(file).as[A]

But from what I understand, Spark doesn't handle disjunction types, so when I have enums in my parquet, previously encoded as integers, and a scala representation like

sealed trait E
case object A extends E
case object B extends E

I cannot do

spark.read.parquet(file).as[E]
// java.lang.UnsupportedOperationException: No Encoder found for E

Makes sense so far, but then, probably too naively, I try

implicit val eEncoder = new org.apache.spark.sql.Encoder[E] {
 def clsTag = ClassTag(classOf[E])
 def schema = StructType(StructField("e", IntegerType, nullable = false)::Nil)
}

And I still get the same "No Encoder found for E" :(

My question at this point is, why is the implicit missing in scope? (or not recognized as an Encoder[E]) and even if it did, how would such an interface allow me to actually decode the data? I would still need to map the value to the proper case object.

I did read a related answer that says "TL;DR There is no good solution right now, and given Spark SQL / Dataset implementation, it is unlikely there will be one in the foreseeable future." But I'm struggling to understand why a custom Encoder couldn't do the trick.

apanday
  • 511
  • 4
  • 15

1 Answers1

5

But I'm struggling to understand why a custom Encoder couldn't do the trick.

Two main reasons:

  • There is no API for custom Encoders. Publicly available are only "binary" Kryo and Java Encoders, which create useless (in case of DataFrame / Dataset[Row]) blobs with no support for any meaningful SQL / DataFrame operations.

    Code like this would work fine

    import org.apache.spark.sql.Encoders
    
    spark.createDataset(Seq(A, B): Seq[E])(Encoders.kryo[E])
    

    but it is nothing more than a curiosity.

  • DataFrame is a columnar store. It is technically possible to encode type hierarchies on top of this structure (private UserDefinedType API does that) but it is cumbersome (as you have to provide storage for all possible variants, see for example How to define schema for custom type in Spark SQL?) and inefficient (in general complex types are somewhat second class citizens in Spark SQL, and many optimizations are not accessible with complex schema, subject to future changes).

    In broader sense DataFrame API is effectively relational (as in relational algebra) and tuples (main building block of relations) are by definition homogeneous, so by extension there is no place in SQL / DataFrame API, for heterogeneous structures.

  • Thanks for your answer. Really the case at hand is that of case classes containing enums as some of their fields; where enum is nothing more than a name on an int, so in the simple case, we're not dealing with heterogenous types and this limitation is quite frustrating as all our case classes are unuseable (not your fault) :) – apanday Jun 19 '18 at 12:04