1

I am trying to apply different types of logic according to the type of spark dataset. Depending on the type of case class that is passed to doWork (Customer or Worker) I have to apply different types of aggregation. How can I do that?

import org.apache.spark.sql.{Dataset, SparkSession}

object SparkSql extends App {
  import spark.implicits._

  val spark = SparkSession
    .builder()
    .appName("Simple app")
    .config("spark.master", "local")
    .getOrCreate()

  sealed trait Person {
    def name: String
  }

  final case class Customer(override val name: String, email: String)                extends Person
  final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person

  val workers: Dataset[Worker] = Seq(
    Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
    Worker("Sam", id = 1, skills = Array("self-motivation"))
  ).toDS

  def doWork(persons: Dataset[Person]): Unit = {
    persons match {
      case ... // Dataset[Customer] ... do something
      case ... // Dataset[Worker] ... do something else
    }
  }

}
Michael
  • 2,436
  • 1
  • 36
  • 57
  • You can't do that due type erasure. Is the type of the dataset static or dynamic? Also, do you want to do something to all the Dataset or to each element? – Luis Miguel Mejía Suárez Jun 08 '20 at 13:23
  • 1
    You can't directly pattern-match on generics, as they don't exist during runtime. However, there might be a way out, see this answer for details: https://stackoverflow.com/questions/12218641/scala-what-is-a-typetag-and-how-do-i-use-it/12232195#12232195 – Rayan Ral Jun 08 '20 at 13:24
  • Query- How do you pass the `Dataset[Worker]` to `def doWork(persons: Dataset[Person]): Unit = {` ? – Som Jun 08 '20 at 14:23

4 Answers4

2

Try this-


sealed trait Person {
  def name: String
}

final case class Customer(override val name: String, email: String)                extends Person
final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person

Test case-

  @Test
  def test62262873(): Unit = {

    val workers: Dataset[Worker] = Seq(
      Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
      Worker("Sam", id = 2, skills = Array("self-motivation"))
    ).toDS

    import scala.reflect.runtime.universe._
    def doWork[T : TypeTag](persons: Dataset[T]): Unit = {
      typeOf[T] match {
        case t if t =:= typeOf[Worker] => println("I'm worker")
          persons.as[Worker].filter(_.id == 2).show(false)
        case t if t =:= typeOf[Customer] => println("I'm Customer")
          persons.as[Customer].filter(_.name.contains("B")).show(false)

      }
    }
    doWork(workers)

    /**
      * I'm worker
      * +----+---+-----------------+
      * |name|id |skills           |
      * +----+---+-----------------+
      * |Sam |2  |[self-motivation]|
      * +----+---+-----------------+
      */
  }
Som
  • 6,193
  • 1
  • 11
  • 22
  • This is interesting however I cannot access the type withing each case, see this modification of your code ``` import scala.reflect.runtime.universe._ def doWork2[T: TypeTag](persons: Dataset[T]): Dataset[T] = { typeOf[T] match { case t if t =:= typeOf[Worker] => persons.filter(_.id == 2) case t if t =:= typeOf[Customer] => persons.filter(_.name.contains("B")) } } ``` – Michael Jun 08 '20 at 15:52
  • 1
    see edit, its working well for me, feel free to accept + upvote if it helps – Som Jun 08 '20 at 16:25
1

I found a solution to my own question however I want to give credit to Someshwar Kale'answer as it does what is requested. In this version, I am using implicit to created converter that I can extend as need be.

import org.apache.spark.sql.{Dataset, SparkSession}

object TempProject extends App {
  import spark.implicits._

  val spark = SparkSession
    .builder()
    .appName("Simple app")
    .config("spark.master", "local")
    .getOrCreate()

  sealed trait Person {
    def name: String
  }
  final case class Customer(override val name: String, email: String)                extends Person
  final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person

  trait CustomDataProcessor[T] {
    def doSomethingCool(dataset: Dataset[T]): Dataset[T]
  }

  implicit object CustomerDataProcessor extends CustomDataProcessor[Customer] {

    override def doSomethingCool(dataset: Dataset[Customer]): Dataset[Customer] =
      dataset.filter(_.name.contains("B"))
  }

  implicit object WorkerDataProcessor extends CustomDataProcessor[Worker] {

    override def doSomethingCool(dataset: Dataset[Worker]): Dataset[Worker] =
      dataset.filter(_.id == 2)
  }

  def doWork[T](person: Dataset[T])(implicit processor: CustomDataProcessor[T]): Unit = {
    processor.doSomethingCool(person)
  }

  val workers: Dataset[Worker] = Seq(
    Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
    Worker("Sam", id = 1, skills = Array("self-motivation"))
  ).toDS

  val customers: Dataset[Customer] = Seq(
    Customer("Bob", "bob@email"),
    Customer("Jack", "jack@email")
  ).toDS

  doWork(workers)
  doWork(customers)
}
Michael
  • 2,436
  • 1
  • 36
  • 57
0

With case classes you can do pattern matching. Case classes are Scala’s way to allow pattern matching on objects without requiring a large amount of boilerplate. Generally, all you need to do is add a single case keyword to each class that you want to be pattern matchable.

As an example:

abstract class Expr
case class Var(name: String) extends Expr
case class Number(num: Double) extends Expr
case class UnOp(operator: String, arg: Expr) extends Expr
case class BinOp(operator: String,left: Expr, right: Expr) extends Expr

def simplifyTop(expr: Expr): Expr = expr match {
  case UnOp("",UnOp("",e)) => e // Double negation
  case BinOp("+", e, Number(0)) => e // Adding zero
  case BinOp("*", e, Number(1)) => e // Multiplying by one
  case _ => expr
}

with your example I would try this

  def doWork(persons: Person): Unit = {
    persons match {
      case Customer => ... do something
      case Worker ... do something else
    }
  }

dataset.map(doWork)
Chema
  • 2,748
  • 2
  • 13
  • 24
  • Sure but how to proceed with Dataset of different types? – Michael Jun 08 '20 at 13:29
  • ````def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] (Scala-specific) Returns a new Dataset that contains the result of applying func to each element. Annotations @Experimental() @Evolving() Since 1.6.0 ```` You could apply map and pass to an object Person ````doWork(persons: Person)```` – Chema Jun 08 '20 at 14:43
0

Modify your method to accept [T <:parent] and you extract bean class name from Dataset.javaRdd as below

import org.apache.spark.sql.Dataset

object InheritDataframe {


  private def matcherDef[T <:parent](dfb: Dataset[T]): Unit = {

    dfb.toJavaRDD.classTag.toString() match {
      case "child1" =>  println("child1")
      case "child2" => println("child2")
      case _ => println("Unkown")
    }

  }

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    val dfB  = List(child1(1)).toDS()
    val dfC  = List(child2(1)).toDS()

    matcherDef(dfB)
    matcherDef(dfC)

  }


}

case class child1(i: Int) extends parent(i)

case class child2(i: Int) extends parent(i)

class parent(j: Int)


QuickSilver
  • 3,915
  • 2
  • 13
  • 29