3

sorry for asking a simple question. I want to pass a case class to a function argument and I want to use it further inside the function. Till now I have tried this with TypeTag and ClassTag but for some reason, I am unable to properly use it or may be I am not looking at the correct place.

Use cases is something similar to this:

case class infoData(colA:Int,colB:String)
case class someOtherData(col1:String,col2:String,col3:Int)

def readCsv[T:???](path:String,passedCaseClass:???): Dataset[???] = {
  sqlContext
    .read
    .option("header", "true")
    .csv(path)
    .as[passedCaseClass]
}

It will be called something like this:

val infoDf = readCsv("/src/main/info.csv",infoData)
val otherDf = readCsv("/src/main/someOtherData.csv",someOtherData)
Sunil Kumar
  • 390
  • 1
  • 7
  • 25
  • What is the signature of `as` ? Does it take an implicit "reader" to do the conversion (like PlayJSON would for example)? Then you could do something like `def readCSV[T](path: String)(implicit reader: Reader[T]): Dataset[T]` – Thilo Dec 03 '18 at 10:22
  • It looks like the "reader" here (in Spark) is called `Encoder[T]`. So an implicit of that type should work. – Thilo Dec 03 '18 at 10:27
  • Hi, as far as I understand `as` takes a `case class` name as an argument to convert the `dataframe` to a `dataset` in `spark`. I use it like this. I am still learning `spark` and `scala`. What do you mean by implicit of that type? Can you explain? If I have something like `val infoEncoder = someEncoder`, so do you mean I should pass `infoEncoder` in the `readCsv` method? – Sunil Kumar Dec 03 '18 at 10:35

2 Answers2

8

There are two things which you should pay attention to,

  1. class names should be in CamelCase, so InfoData.
  2. Once you have bound a type to a DataSet, its not a DataFrame. DataFrame is a special name for a DataSet of general purpose Row.

What you need is to ensure that your provided class has an implicit instance of corresponding Encoder in current scope.

case class InfoData(colA: Int, colB: String)

Encoder instances for primitive types (Int, String, etc) and case classes can be obtained by importing spark.implicits._

def readCsv[T](path: String)(implicit encoder: Encoder: T): Dataset[T] = {
  spark
    .read
    .option("header", "true")
    .csv(path)
    .as[T]
}

Or, you can use context bound,

def readCsv[T: Encoder[T]](path: String): Dataset[T] = {
  spark
    .read
    .option("header", "true")
    .csv(path)
    .as[T]
}

Now, you can use it as following,

val spark = ...

import spark.implicits._

def readCsv[T: Encoder[T]](path: String): Dataset[T] = {
  spark
    .read
    .option("header", "true")
    .csv(path)
    .as[T]
}

val infoDS = readCsv[InfoData]("/src/main/info.csv")
sarveshseri
  • 13,738
  • 28
  • 47
7

First change your function definition to:

object t0 {
    def readCsv[T] (path: String)(implicit spark: SparkSession, encoder: Encoder[T]): Dataset[T] = {
      spark
        .read
        .option("header", "true")
        .csv(path)
        .as[T]
    }
}

You don´t need to perform any kind of reflection to create a generic readCsv function. The key here is that Spark needs the encoder at compile time. So you can pass it as implicit parameter and the compiler will add it.

Because Spark SQL can deserialize product types(your case classes) including the default encoders, it is easy to call your function like:

case class infoData(colA: Int, colB: String)
case class someOtherData(col1: String, col2: String, col3: Int)

object test {
  import t0._

  implicit val spark = SparkSession.builder().getOrCreate()

  import spark.implicits._
  readCsv[infoData]("/tmp")

}

Hope it helps

Emiliano Martinez
  • 4,073
  • 2
  • 9
  • 19