1

I am trying to use Spark SQL in the spring boot application. We would want to create the dataset from the list of objects (Employee). The Employee is a Scala case class that is defined by another team and we were able to define the list of objects in our project

But when we were trying to create a dataset from that list, we are getting an empty dataset without any values.

List<Employee> employees = new ArrayList<>();
employees.add(new Employee(1,"Tom",25));
Dataset<Row> employeeDF = spark.createDataFrame(employees,Employee.class);

The employeeDF is coming out to be empty, so not sure if we are doing it correctly or missing anything.

Is it possible to create a Dataset from the list of objects that refer to the Scala case class?

I tried everywhere to look for the issue and I was not able to find it. If its a duplicate please let me know.

Thanks for your reply Harminder

1 Answers1

3

For reference the output of employeeDF.show() is:

++
||
++
||
++

This is due to:

Dataset<Row> employeeDF = spark.createDataFrame(employees,Employee.class);

expecting a bean class, which Employee probably isn't, you need an Encoder to handle Scala Products (case class or tuple), which needs the Scala compiler in order to generate a TypeTag. Ask your Scala team to add this object to their code (based on this SO):

object TypeUtils {

  import org.apache.spark.sql.{Encoder, Encoders}

  import scala.reflect.api.Universe

  import scala.reflect.ClassTag
  import scala.reflect.runtime.universe._

  def typeTag[T](clazz: Class[T]): TypeTag[T] = {
    val classTag = ClassTag[T](clazz)

    val m = runtimeMirror(clazz.getClassLoader)
    val tpe = m.classSymbol(clazz).toType
    val typeCreator = new scala.reflect.api.TypeCreator {
      def apply[U <: Universe with Singleton](m1: scala.reflect.api.Mirror[U]): U # Type =
        if (m1 != m) throw new RuntimeException("wrong mirror") else tpe.asInstanceOf[U#Type]
    }
    val typeTag = TypeTag[T](m, typeCreator)
    typeTag
  }

  def productEncoderFor[T <: Product](clazz: Class[T]): Encoder[T] =
    Encoders.product[T](typeTag(clazz))
}

, then you can write:

Dataset<Employee> employeeDF = spark.createDataset(employees, TypeUtils.productEncoderFor(Employee.class));
employeeDF.show();

yielding:

+---+----+---+
| id|name|age|
+---+----+---+
|  1| Tom| 25|
+---+----+---+

The productEncoderFor function will work for any such case class / tuple. If you really need a Dataset<Row> then just call .toDF() after creating the dataset.

Chris
  • 1,240
  • 7
  • 8