0

This code below I understand and was helpful.

But I would like to make this a generic approach, but cannot actually get started, and think that it is not possible actually with the case statement. I am looking at another approach, but am interested if a generic approach is also possible here.

import spark.implicits._
import org.apache.spark.sql.Encoders

// Creating case classes with the schema of your json objects. We're making
// these to make use of strongly typed Datasets. Notice that the MyChgClass has
// each field as an Option: this will enable us to choose between "chg" and
// "before"
case class MyChgClass(b: Option[String], c: Option[String], d: Option[String])
case class MyFullClass(k: Int, b: String, c: String, d: String)
case class MyEndClass(id: Int, after: MyFullClass)

// Creating schemas for the from_json function
val chgSchema = Encoders.product[MyChgClass].schema
val beforeSchema = Encoders.product[MyFullClass].schema

// Your dataframe from the example
val df = Seq(
  (1, """{"b": "new", "c": "new"}""",  """{"k": 1, "b": "old", "c": "old", "d": "old"}""" ),
  (2, """{"b": "new", "d": "new"}""",  """{"k": 2, "b": "old", "c": "old", "d": "old"}""" )
).toDF("id", "chg", "before")

// Parsing the json string into our case classes and finishing by creating a
// strongly typed dataset with the .as[] method
val parsedDf = df
  .withColumn("parsedChg",from_json(col("chg"), chgSchema))
  .withColumn("parsedBefore",from_json(col("before"), beforeSchema))
  .drop("chg")
  .drop("before")
  .as[(Int, MyChgClass, MyFullClass)]

// Mapping over our dataset with a lot of control of exactly what we want. Since
// the "chg" fields are options, we can use the getOrElse method to choose
// between either the "chg" field or the "before" field
val output = parsedDf.map{
  case (id, chg, before) => {
    MyEndClass(id, MyFullClass(
      before.k,
      chg.b.getOrElse(before.b),
      chg.c.getOrElse(before.c),
      chg.d.getOrElse(before.d)
    ))
  }
}

output.show(false)
parsedDf.printSchema()

We have many such situations, but with differing payload. I can get the fields of the case class, but cannot see the forest for the trees how to make this generic. E,g, [T] type approach for the below. I am wondering if this can be done in fact?

I can get a List of attributes, and am wondering if something like attrList.map(x => ...) with substitution can be used for the chg.b etc?

  val output = parsedDf.map{
      case (id, chg, before) => {
        MyEndClass(id, MyFullClass(
          before.k,
          chg.b.getOrElse(before.b),
          chg.c.getOrElse(before.c),
          chg.d.getOrElse(before.d)
        ))
      }
    }
thebluephantom
  • 16,458
  • 8
  • 40
  • 83

2 Answers2

1

Does the following macro work for your use case?

// libraryDependencies += scalaOrganization.value % "scala-reflect" % scalaVersion.value
import scala.language.experimental.macros
import scala.reflect.macros.blackbox

def mkInstance[A, B](before: A, chg: B): A = macro mkInstanceImpl[A]

def mkInstanceImpl[A: c.WeakTypeTag](c: blackbox.Context)(before: c.Tree, chg: c.Tree): c.Tree = {
  import c.universe._

  val A = weakTypeOf[A]

  val classAccessors = A.decls.collect {
    case m: MethodSymbol if m.isCaseAccessor => m
  }

  val arg = q"$before.${classAccessors.head}"
  val args = classAccessors.tail.map(m => q"$chg.${m.name}.getOrElse($before.$m)")

  q"new $A($arg, ..$args)"
}
// in a different subproject

val output = parsedDf.map{
  case (id, chg, before) => {
    MyEndClass(id,
      mkInstance(before, chg)
    )
  }
}

    // scalacOptions += "-Ymacro-debug-lite"
// scalac: new MyFullClass(before.k, chg.b.getOrElse(before.b), chg.c.getOrElse(before.c), chg.d.getOrElse(before.d))

https://scastie.scala-lang.org/bXq5FHb3QuC5PqlhZOfiqA

Alternatively you can use Shapeless

// libraryDependencies += "com.chuusai" %% "shapeless" % "2.3.10"
import shapeless.{Generic, HList, LabelledGeneric, Poly2}
import shapeless.ops.hlist.{IsHCons, Mapped, ZipWith}
import shapeless.ops.record.Keys

def mkInstance[A, B, L <: HList, H, T <: HList, OptT <: HList, L1 <: HList, T1 <: HList, T2 <: HList, K <: HList](
  before: A, chg: B
)(implicit
  // checking that field names in tail of A are equal to field names in B  
  aLabelledGeneric: LabelledGeneric.Aux[A, L1],
  bLabelledGeneric: LabelledGeneric.Aux[B, T2],
  isHCons1: IsHCons.Aux[L1, _, T1],
  keys: Keys.Aux[T1, K],
  keys1: Keys.Aux[T2, K],
  // checking that field types in B are Options of field types in tail of A 
  aGeneric: Generic.Aux[A, L],
  isHCons: IsHCons.Aux[L, H, T],
  mapped: Mapped.Aux[T, Option, OptT],
  bGeneric: Generic.Aux[B, OptT],
  zipWith: ZipWith.Aux[OptT, T, getOrElsePoly.type, T],
): A = {
  val aHList = aGeneric.to(before)
  aGeneric.from(isHCons.cons(isHCons.head(aHList), zipWith(bGeneric.to(chg), isHCons.tail(aHList))))
}

object getOrElsePoly extends Poly2 {
  implicit def cse[A]: Case.Aux[Option[A], A, A] = at(_ getOrElse _)
}

Since all the classes are now known at compile-time it's better to use compile-time reflection (macros themselves or macros hidden inside type classes as in Shapeless) but in principle runtime reflection also can be used

import scala.reflect.ClassTag
import scala.reflect.runtime.{currentMirror => rm}
import scala.reflect.runtime.universe._

def mkInstance[A: TypeTag : ClassTag, B: TypeTag : ClassTag](before: A, chg: B): A = {
  val A = typeOf[A]
  val B = typeOf[B]
  val classAccessors = A.decls.collect {
    case m: MethodSymbol if m.isCaseAccessor => m
  }.toList
  val arg = rm.reflect(before).reflectMethod(classAccessors.head)()
  val args = classAccessors.tail.map(m =>
    rm.reflect(chg).reflectMethod(B.decl(m.name).asMethod)()
      .asInstanceOf[Option[_]].getOrElse(
        rm.reflect(before).reflectMethod(m)()
      )
  )
  rm.reflectClass(A.typeSymbol.asClass)
    .reflectConstructor(A.decl(termNames.CONSTRUCTOR).asMethod)(arg :: args : _*)
    .asInstanceOf[A]
}
Dmytro Mitin
  • 48,194
  • 3
  • 28
  • 66
  • Will be looking later. – thebluephantom Feb 04 '23 at 11:37
  • I am not sure anyone I know is at this level. Worthy of a bounty. – thebluephantom Feb 04 '23 at 11:40
  • Looks good. Need to try. – thebluephantom Feb 05 '23 at 22:32
  • macro gives error in databricks shell – thebluephantom Feb 06 '23 at 19:50
  • command-668934795439029:4: error: macro implementation reference has wrong shape. required: macro [].[[]] or macro [].[[]] note: macro definition is not supported in the REPL when using -Yrepl-classbased. def mkInstance[A, B](before: A, chg: B): A = macro mkInstanceImpl[A] – thebluephantom Feb 06 '23 at 19:51
  • experimental and all that...reliable? – thebluephantom Feb 06 '23 at 19:51
  • I have added a bounty for you. Cannot get the first approach to work. Second approach fine. Trying to follow. may eb we can do in a chat tomorrow. – thebluephantom Feb 06 '23 at 20:06
  • @thebluephantom I have never run macros in databricks shell but I guess it should be similar to (based on one of) Scala REPL or Spark shell. Your error message says `macro definition is not supported in the REPL when using -Yrepl-classbased`. This means the REPL is run with scalac option `-Yrepl-classbased` which makes using macros in the REPL impossible. You should try to find the way to run the REPL with `-Yrepl-classbased` switched off. – Dmytro Mitin Feb 07 '23 at 04:39
  • @thebluephantom For example in [Scala-cli](https://scala-cli.virtuslab.org) this can be done with command `scala-cli repl --dep org.scala-lang:scala-reflect:2.13.10 --scala-version 2.13.10 --jvm 8 --scalac-option -Yrepl-class-based:false` https://gist.github.com/DmytroMitin/352964368a47642e4bb902eec59cfc80 – Dmytro Mitin Feb 07 '23 at 04:39
  • @thebluephantom I added one more solution, with runtime reflection – Dmytro Mitin Feb 07 '23 at 04:57
  • @thebluephantom I slightly modified the macro. I replaced `$m` with `${m.name}` in one place. We can't call a method by method symbol if the method symbol is from a different class. You can check that the macro is working at https://scastie.scala-lang.org/bXq5FHb3QuC5PqlhZOfiqA – Dmytro Mitin Feb 07 '23 at 05:04
  • @thebluephantom Scala REPL can be run with `scala -Yrepl-class-based:false`, I guess Spark shell can be run with `spark-shell -Yrepl-class-based:false` too. Try to add this scalac option. Alternatively, you can create a normal Scala/sbt project with a macro, package it into jar and use this jar at databricks. – Dmytro Mitin Feb 07 '23 at 05:18
  • @thebluephantom *"experimental and all that...reliable?"* "Experimental" means that Scala 2 macros are completely different from Scala 3 macros (which are not experimental). Many Scala 2 libraries use macros. – Dmytro Mitin Feb 07 '23 at 05:20
  • Excellent for sure. – thebluephantom Feb 07 '23 at 06:52
  • Actually, the run-time is better and I will try it. Thx for that. Here in the example I hard-coded things whereas we will get data from files for the schema - sort of registry approach. I am not sure compile time will work that well with such an approach. Or am I missing something? I found one person familiar with shapeless etc. – thebluephantom Feb 07 '23 at 08:46
  • @thebluephantom Well, sometimes compile-time reflection can be not enough and runtime reflection is necessary. Runtime reflection has runtime overhead. In your specific use case I can't see why compile-time techniques are now not enough. The classes `MyChgClass`, `MyFullClass` (their field names and types) are known at compile time, aren't they? `before`, `chg` and everything else can be runtime values. What exactly is hardcoded? – Dmytro Mitin Feb 07 '23 at 10:24
  • We need to get the definitions for the records to be processed from a file. they change sometimes and we do not want hard-coding in program - in real life. – thebluephantom Feb 07 '23 at 10:26
  • @thebluephantom Depends on what definitions and how. If classes `MyChgClass`, `MyFullClass` are known at compile-time then compile-time techniques are enough. If they are generated at runtime then runtime techniques are necessary (maybe sometimes not even runtime reflection from `scala-reflect` but runtime compilation like using `scala.tools.reflect.ToolBox` from `scala-compiler` etc.). – Dmytro Mitin Feb 07 '23 at 10:48
  • 1
    @thebluephantom My runtime-reflection method `mkInstance` also expects now that types becoming `A`, `B` are known at compile time. You can see the usage of `TypeTag`/`ClassTag`, these type classes persist type/class information from compile time to runtime. Feel free to open a new question with [MCVE](https://stackoverflow.com/help/minimal-reproducible-example) without hardcoded things and we'll see whether you need compile-time or runtime reflection. – Dmytro Mitin Feb 07 '23 at 10:48
  • OK, will do. Interesting point. – thebluephantom Feb 07 '23 at 11:32
  • Indeed I was a little unclear. My humblest apologies. – thebluephantom Feb 07 '23 at 14:57
  • Did u get the bounty? We can hardcode the schema in code and github action ci cd the changed schema periodically. – thebluephantom Feb 07 '23 at 19:31
  • @thebluephantom No, not yet. It's written at the top: "The bounty expires in 6 days. Answers to this question are eligible for a +50 reputation bounty." – Dmytro Mitin Feb 07 '23 at 19:42
  • It stated I could award already. Interesting. – thebluephantom Feb 07 '23 at 19:57
  • @thebluephantom Thanks for the bounty. I received it. Here is an example I kept in mind while talking about runtime-generated classes https://scastie.scala-lang.org/DmytroMitin/FN64ZX8GSOO4bHOL1w6HUw Field information (field names and types) for classes `MyChgClass`, `MyFullClass` is parsed now from runtime strings. The classes are generated at runtime. `mkInstance1` is not using compile-time information at all. – Dmytro Mitin Feb 08 '23 at 12:40
0

Although title of question is a little different, I asked the question on how to process data generically from a feed of what is ostensibly a CDC feed. This approach handles the above also, plus some schema evolution aspects. Hence, it may be of interest to others.

A simpler approach. Use compile-time reflection, and allowing for schema changes, a simple approach for JSON data from CDC via KAFKA can be done like this, shown for two types of records from Shareplex, in the same feed - you need to adapt to your own situation:

...
import org.json4s._
import org.json4s.jackson.JsonMethods._
import spark.implicits._
import org.apache.spark.sql.functions.{col, lit, when, from_json, map_keys, map_values, regexp_replace, coalesce}
import org.apache.spark.sql.types.{MapType, StringType}
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.{MapType, StringType, StructType, IntegerType}

case class MyMeta(op: String, table: String)
val metaSchema = Encoders.product[MyMeta].schema
case class MySales(NUM: Option[Integer], PRODUCT_ID: Option[String], DESCRIPTION: Option[String], OLD_FIELD_1: Option[String]) 
val salesSchema = Encoders.product[MySales].schema
case class MyProducts(PRODUCT_ID: Option[String], DESCRIPTION: Option[String], PRICE: Option[Int], OLD_FIELD_1: Option[String]) 
val ProductsSchema = Encoders.product[MyProducts].schema

def getAfterImage (op: String, data: String, key: String, jsonOLD_TABLE_FIELDS: String) : String = {   
  val jsonOLD_FIELDS = parse(jsonOLD_TABLE_FIELDS)   
  val jsonData = parse(data)                         
  val jsonKey = parse(key)                           
   
  op match {
  case "ins" =>
               return(compact(render(jsonData merge jsonOLD_FIELDS)))
  case _ => 
               val Diff(changed, added, deleted) = jsonKey diff jsonData
               return(compact(render(changed merge deleted merge jsonOLD_FIELDS)))
  }
}
val afterImage = spark.udf.register("callUDFAI", getAfterImage _)

val path = "/FileStore/tables/json_DE_CDC_SHAREPLEX_file.txt"  
val df = spark.read.text(path)  // String.
val df2 = df.withColumn("value", from_json(col("value"), MapType(StringType, StringType)))    
val df3 = df2.select(map_values(col("value")))  
val df4 = df3.select($"map_values(value)"(0).as("meta"), $"map_values(value)"(1).as("data"), $"map_values(value)"(2).as("key")).withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").select(col("parsedMeta.*"), col("data"), col("key")).withColumn("key2", coalesce(col("key"), lit(""" { "DUMMY_FIELD_XXX": ""} """) )).toDF().cache()

val df_sales    = df4.filter('table === "BILL.SALES") 
val df_products = df4.filter('table === "BILL.PRODUCTS")
val df_sales_output = df_sales.withColumn("afterImage", afterImage(col("op"), col("data"), col("key2") , lit(""" { "OLD_FIELD_1": ""} """)))
                              .select("afterImage") 
val df_products_output = df_products.withColumn("afterImage", afterImage(col("op"), col("data"), col("key2") , lit(""" { "OLD_FIELD_A":"", "OLD_FIELD_B":""} """)))
                                    .select("afterImage")                          
val df_sales_output_final = df_sales_output.withColumn("parsedSales", from_json(col("afterImage"), salesSchema)) 
...
thebluephantom
  • 16,458
  • 8
  • 40
  • 83