1

I'm trying to run an example from the Spark book Spark: The Definitive Guide

build.sbt

ThisBuild / scalaVersion := "3.2.1"

libraryDependencies ++= Seq(
  ("org.apache.spark" %% "spark-sql" % "3.2.0" % "provided").cross(CrossVersion.for3Use2_13)
)

Compile / run := Defaults.runTask(Compile / fullClasspath, Compile / run / mainClass, Compile / run / runner).evaluated

lazy val root = (project in file("."))
  .settings(
    name := "scalalearn"
  )

main.scala

// imports
...

object spark1 {
  @main
  def main(args: String*): Unit = {
    ...

    case class Flight(
                       DEST_COUNTRY_NAME: String,
                       ORIGIN_COUNTRY_NAME: String,
                       count: BigInt
                     )

    val flightsDF = spark.read
      .parquet(s"$dataRootPath/data/flight-data/parquet/2010-summary.parquet/")

    //    import spark.implicits._ // FAILS
    //    import spark.sqlContext.implicits._ // FAILS

    val flights = flightsDF.as[Flight]

    // in Scala
    flights
      .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
      .map(flight_row => flight_row)
      .take(5)

    spark.stop()
  }
}

Getting an error with the line val flights = flightsDF.as[Flight]

Unable to find encoder for type Flight. An implicit Encoder[Flight] is needed to store Flight
 instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes)
 are supported by importing spark.implicits._ Support for serializing other types will be added in
 future releases.

Any help is appreciated.

Scala - 3.2.1 Spark - 3.2.0

Tried importing implicits from spark.implicits._ and spark.sqlContext.implicits._ The example works on scala 2.x

Looking for a way to convert DF to case class without any third party workarounds

Yashwanth
  • 37
  • 1
  • 7
  • 2
    In 3.2.0 the full error message is what you wrote and then `I found: spark.implicits.newProductEncoder[Flight](/*missing */summon[reflect.runtime.universe.TypeTag[Flight]]) But no implicit values were found that match type reflect.runtime.universe.TypeTag[Flight].` So read https://stackoverflow.com/questions/73836319/scala-spark-encoders-productx-where-x-is-a-case-class-keeps-giving-me-no-ty – Dmytro Mitin Oct 30 '22 at 01:22
  • I meant the error message if we uncomment one of imports `import spark.implicits._`, `import spark.sqlContext.implicits._` – Dmytro Mitin Oct 30 '22 at 01:25
  • 1
    Scala 3 is not officially supported by Spark. Would not recommend it outside experimental scope. – cchantep Oct 31 '22 at 17:00
  • https://www.47deg.com/blog/using-scala-3-with-spark/ https://medium.com/virtuslab/scala-3-and-spark-389f7ecef71b – Dmytro Mitin Nov 01 '22 at 04:51

1 Answers1

3

You need to add Scala-3 dependency for Spark codecs

https://github.com/vincenzobaz/spark-scala3

libraryDependencies += "io.github.vincenzobaz" %% "spark-scala3" % "0.1.3"

and import Scala-3

import scala3encoders.given

instead of Scala-2

import spark.implicits._ // FAILS
import spark.sqlContext.implicits._ // FAILS

Scala Spark Encoders.product[X] (where X is a case class) keeps giving me "No TypeTag available for X" error


Regarding BigInt,

Does Spark support BigInteger type?

Spark does support Java BigIntegers but possibly with some loss of precision. If the numerical value of the BigInteger fits in a long (i.e. between -2^63 and 2^63-1) then it will be stored by Spark as a LongType. Otherwise it will be stored as a DecimalType, but this type only supports 38 digits of precision.

Correct codecs for comparatively small BigInts (fitting into LongType) are

import scala3encoders.derivation.{Deserializer, Serializer}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.types.{DataType, LongType, ObjectType}

given Deserializer[BigInt] with
  def inputType: DataType = LongType

  def deserialize(path: Expression): Expression =
    StaticInvoke(
      BigInt.getClass,
      ObjectType(classOf[BigInt]),
      "apply",
      path :: Nil,
      returnNullable = false
    )

given Serializer[BigInt] with
  def inputType: DataType = ObjectType(classOf[BigInt])

  def serialize(inputObject: Expression): Expression =
    Invoke(inputObject, "longValue", LongType, returnNullable = false)

import scala3encoders.given

https://github.com/DmytroMitin/spark_stackoverflow/blob/87ef5361dd3553f8cc5ced26fed4c17c0061d6a2/src/main/scala/main.scala

(https://github.com/databricks/Spark-The-Definitive-Guide)

https://github.com/yashwanthreddyg/spark_stackoverflow/pull/1

https://gist.github.com/DmytroMitin/3c0fe6983a254b350ff9feedbb066bef

https://github.com/vincenzobaz/spark-scala3/pull/22

For large BigInts (not fitting into LongType when DecimalType is necessary) the codecs are

import scala3encoders.derivation.{Deserializer, Serializer}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.types.{DataType, DataTypes, Decimal, ObjectType}

val decimalType = DataTypes.createDecimalType(38, 0)

given Deserializer[BigInt] with
  def inputType: DataType = decimalType

  def deserialize(path: Expression): Expression =
    Invoke(path, "toScalaBigInt", ObjectType(classOf[scala.math.BigInt]), returnNullable = false)

given Serializer[BigInt] with
  def inputType: DataType = ObjectType(classOf[BigInt])

  def serialize(inputObject: Expression): Expression =
    StaticInvoke(
      Decimal.getClass,
      decimalType,
      "apply",
      inputObject :: Nil,
      returnNullable = false
    )

import scala3encoders.given

which is almost the same as

import org.apache.spark.sql.catalyst.DeserializerBuildHelper.createDeserializerForScalaBigInt
import org.apache.spark.sql.catalyst.SerializerBuildHelper.createSerializerForScalaBigInt
import scala3encoders.derivation.{Deserializer, Serializer}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.{DataType, DataTypes, ObjectType}

val decimalType = DataTypes.createDecimalType(38, 0)

given Deserializer[BigInt] with
  def inputType: DataType = decimalType

  def deserialize(path: Expression): Expression =
    createDeserializerForScalaBigInt(path)

given Serializer[BigInt] with
  def inputType: DataType = ObjectType(classOf[BigInt])

  def serialize(inputObject: Expression): Expression =
    createSerializerForScalaBigInt(inputObject)

import scala3encoders.given

https://gist.github.com/DmytroMitin/8124d2a4cd25c8488c00c5a32f244f64

Runtime exception you observed meant that BigInts from the parquet file are comparatively small (fitting into LongType) and you tried my codecs for large BigInts (DecimalType)

https://gist.github.com/DmytroMitin/ad77677072c1d8d5538c94cb428c8fa4 (ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java': A method named "toScalaBigInt" is not declared in any enclosing class nor any supertype, nor through a static import)

Vice versa, for large BigInts (DecimalType) and codecs for small BigInts (LongType): https://gist.github.com/DmytroMitin/3a3a61082fbfc12447f6e926fc45c7cd (ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java': No applicable constructor/method found for actual parameters "org.apache.spark.sql.types.Decimal"; candidates are: ...)

We can't use both codecs for LongType and DecimalType: https://gist.github.com/DmytroMitin/32040a6b702fff5c53c727616b318cb5 (Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: All input types must be the same except nullable, containsNull, valueContainsNull flags. The input types found are LongType DecimalType(38,0))

For a mixture of small and large BigInts correct is using codecs for DecimalType https://gist.github.com/DmytroMitin/626e09a63a387e6ff1d7fe264fc14d6b


The approach with manually created TypeTags seems to work too (not using scala3encoders)

// libraryDependencies += scalaOrganization.value % "scala-reflect" % "2.13.10" // in Scala 3
import scala.reflect.api
import scala.reflect.runtime.universe.{NoType, Type, TypeTag, internal}
import scala.reflect.runtime.universe

inline def createTypeTag[T](mirror: api.Mirror[_ <: api.Universe with Singleton], tpe: mirror.universe.Type): mirror.universe.TypeTag[T] = {
  mirror.universe.TypeTag.apply[T](mirror.asInstanceOf[api.Mirror[mirror.universe.type]],
    new api.TypeCreator {
      override def apply[U <: api.Universe with Singleton](m: api.Mirror[U]): m.universe.Type = {
        tpe.asInstanceOf[m.universe.Type]
      }
    }
  )
}

val rm = universe.runtimeMirror(this.getClass.getClassLoader)
// val bigIntTpe = internal.typeRef(internal.typeRef(NoType, rm.staticPackage("scala.math"), Nil), rm.staticClass("scala.math.BigInt"), Nil)
// val strTpe = internal.typeRef(internal.typeRef(NoType, rm.staticPackage("java.lang"), Nil), rm.staticClass("java.lang.String"), Nil)
val flightTpe = internal.typeRef(NoType, rm.staticClass("Flight"), Nil)
// given TypeTag[BigInt] = createTypeTag[BigInt](rm, bigIntTpe)
// given TypeTag[String] = createTypeTag[String](rm, strTpe)
given TypeTag[Flight] = createTypeTag[Flight](rm, flightTpe)

import spark.implicits._

https://gist.github.com/DmytroMitin/bb0ccd5f1c533b2baec1756da52f8824

Dmytro Mitin
  • 48,194
  • 3
  • 28
  • 66
  • It works if I remove `count: BigInt` field from the case class. The error is: ``` No given instance of type scala3encoders.derivation.Serializer[BigInt] was found. ... But Failed to synthesize an instance of type deriving.Mirror.Of[BigInt]: * class BigInt is not a generic product because it is not a case class * class BigInt is not a generic sum because it is not a sealed class. val flights = flightsDF.as[Flight] ``` Works if I change BigInt to Int – Yashwanth Oct 30 '22 at 04:01
  • Correction: It compiles, but having BigInt is necessary to run this example – Yashwanth Oct 30 '22 at 04:07
  • Tried the updated solution. At runtime, I get the error `ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 82, Column 24: A method named "toScalaBigInt" is not declared in any enclosing class nor any supertype, nor through a static import` – Yashwanth Oct 30 '22 at 21:25
  • @Yashwanth Thanks for your feedback. Could you provide some short code sample reproducing this runtime error? – Dmytro Mitin Oct 30 '22 at 23:02
  • @Yashwanth It seems it's `Decimal` that has `.toScalaBigInt` and not `LongType`. So maybe `LongType` should be replaced in the serailizer/deserializer with `DecimalType`. But I'm just guessing blindly. If I had error-reproducing code from you I could try to improve the codec. – Dmytro Mitin Oct 30 '22 at 23:24
  • 1
    Hi, you can find the full code here: https://github.com/yashwanthreddyg/spark_stackoverflow/tree/main At this point I'm thinking of sticking to scala 2.x. Do you know if there is first party way of converting DFs to case classes? Thank you for the help so far. – Yashwanth Oct 31 '22 at 21:21
  • @Yashwanth I finally figured out the correct Serializer/Deserializer for BigInt too. See the update. – Dmytro Mitin Nov 04 '22 at 21:01
  • 1
    https://virtuslab.com/blog/reconciling-spark-apis-for-scala/ Also https://github.com/zio/zio-quill/tree/master/quill-spark/src – Dmytro Mitin Nov 23 '22 at 18:23
  • https://github.com/VirtusLab/iskra (link from MateuszKubuszok in https://stackoverflow.com/questions/74549477/scala3-crafting-types-through-metaprogramming ) – Dmytro Mitin Nov 29 '22 at 00:54