59

If I wanted to create a StructType (i.e. a DataFrame.schema) out of a case class, is there a way to do it without creating a DataFrame? I can easily do:

case class TestCase(id: Long)
val schema = Seq[TestCase]().toDF.schema

But it seems overkill to actually create a DataFrame when all I want is the schema.

(If you are curious, the reason behind the question is that I am defining a UserDefinedAggregateFunction, and to do so you override a couple of methods that return StructTypes and I use case classes.)

David Griffin
  • 13,677
  • 5
  • 47
  • 65

4 Answers4

89

You can do it the same way SQLContext.createDataFrame does it:

import org.apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[TestCase].dataType.asInstanceOf[StructType]
Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • Thanks -- hadn't quite made it into `o.a.s.sql.catalyst` yet. And had I been thinking straight I would have started with `createDataFrame` just like you did. `:-(` – David Griffin Apr 20 '16 at 14:07
  • Sweet, you can even do `...schemaFor[(Long,Int,Long)]...` – David Griffin Apr 20 '16 at 14:11
  • No worries - I only found it easily because I've tried something similar myself a while ago ;) And yes - would work for any `Product`, thank you Scala! – Tzach Zohar Apr 20 '16 at 14:19
  • 2
    I kind of like the `toDF` version just for terseness though – David Griffin Apr 20 '16 at 14:21
  • Do you know how to do an Array of type T with this approach? I tried wrapping T in another case class but it does not work as expected – Yeikel Jan 15 '19 at 22:50
  • Like `Encoders` in another answer, all of `org.apache.spark.sql.catalyst` is considered experimental (e.g. it isn't included in the online documentation): https://github.com/apache/spark/blob/v2.4.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala#L21-L22 – huon Feb 28 '19 at 03:34
76

I know this question is almost a year old but I came across it and thought others who do also might want to know that I have just learned to use this approach:

import org.apache.spark.sql.Encoders
val mySchema = Encoders.product[MyCaseClass].schema
Jahfet
  • 279
  • 4
  • 11
Kurt
  • 761
  • 5
  • 2
  • 3
    Be aware - the `Encoders` object is flagged with the `@Experimental` annotation: "An experimental user-facing API. Experimental API's might change or be removed in minor versions of Spark, or be adopted as first-class Spark API's." Discovered that in an effort to figure out pros/cons of the different approaches (current answer vs accepted answer.) – Rick Haffey Nov 13 '17 at 18:50
12

in case someone wants to do this for a custom Java bean:

ExpressionEncoder.javaBean(Event.class).schema().json()
Art
  • 1,302
  • 13
  • 25
  • 2
    There's also ```Encoders.bean(Event.class).schema()``` which I assume does the same. – Rick Moritz Jul 24 '17 at 16:50
  • When I use this to set the schema, I have the problem that the function above returns data members in alphabetical position while my data columns in the file are not. As it tries to match on order instead of on name, this results in corrupt data. Any ideas on how to solve this? – Sparky Mar 26 '18 at 14:30
5

Instead of manually reproducing the logic for creating the implicit Encoder object that gets passed to toDF, one can use that directly (or, more precisely, implicitly in the same way as toDF):

// spark: SparkSession

import spark.implicits._

implicitly[Encoder[MyCaseClass]].schema

Unfortunately, this actually suffers from the same problem as using org.apache.spark.sql.catalyst or Encoders as in the other answers: the Encoder trait is experimental.

How does this work? The toDF method on Seq comes from a DatasetHolder, which is created via the implicit localSeqToDatasetHolder that is imported via spark.implicits._. That function is defined like:

implicit def localSeqToDatasetHolder[T](s: Seq[T])(implicit arg0: Encoder[T]): DatasetHolder[T]

As you can see, it takes an implicit Encoder[T] argument, which, for a case class, can be computed via newProductEncoder (also imported via spark.implicits._). We can reproduce this implicit logic to get an Encoder for our case class, via the convenience scala.Predef.implicitly (in scope by default, because it's from Predef) that will just returns its requested implicit argument:

def implicitly[T](implicit e: T): T
huon
  • 94,605
  • 21
  • 231
  • 225