1

What is Needed:

number of tables in source database are changing rapidly and thus I don't want to edit case classes so I dynamically generate them through SCALA code and put in package. But now not able to read it dynamically. If this works than I would parse "com.example.datasources.fileSystemSource.schema.{}" as object schema members in loop

What has already been Done:

I have some case classes dynamically generated from schema of database tables as below:

object schema{
case class Users(name: String,
                 favorite_color: String,
                 favorite_numbers: Array[Int])

case class UserData(registration_dttm: Timestamp,
                    id: Int,
                    first_name: String,
                    last_name: String,
                    email: String,
                    gender: String,
                    ip_address: String,
                    cc: String,
                    country: String,
                    birthdate: String,
                    salary: Double,
                    title: String,
                    comments: String)
}

Then i have used them as dynamic type to read in Load[T] function in my Loader.scala as below:

import org.apache.spark.sql.{Dataset, Encoder, SparkSession}

class Load[T <: Product: Encoder](val tableName: String,
                                       val inputPath: String,
                                       val spark: SparkSession,
                                       val saveMode: String,
                                       val outputPath: String,
                                       val metadata: Boolean)
    extends Loader[T] {

  val fileSystemSourceInstance: FileSystem[T] =
    new FileSystem[T](inputPath, spark, saveMode, tableName)

  override def Load: Dataset[T] =
    fileSystemSourceInstance.provideData(metadata, outputPath).as[T]

}

Now, by using reflect.api I am able to get TypeTag for my case classes.

def stringToTypeTag[A](name: String): TypeTag[A] = {
    val c = Class.forName(name)
    val mirror = runtimeMirror(c.getClassLoader)
    val sym = mirror.staticClass(name)
    val tpe = sym.selfType
    TypeTag(mirror, new api.TypeCreator {
      def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =

        if (m eq mirror) tpe.asInstanceOf[U # Type]
        else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.")
    })
  }

So if i print now my case class type tag I got:

val typetagDynamic = stringToTypeTag("com.example.datasources.fileSystemSource.schema.Users")
println(typetags)
TypeTag[com.example.datasources.fileSystemSource.schema.Users]

Problem:

Need to read these TypeTag or Dynamically generated case classes, to encode my datasets as below:

new Load[typetagDynamic](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic]).Load 

This is giving me error : Cannot resolve symbol typetagDynamic

if used like this:

new Load[typetagDynamic.type](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic.type]).Load 

This is giving me error : type arguments [T] do not conform to method product's type parameter bounds [T <: Product]

Dmytro Mitin
  • 48,194
  • 3
  • 28
  • 66
  • `typetagDynamic` is a value, not a type. – Dmytro Mitin Sep 24 '20 at 11:40
  • @DmytroMitin Thanks, yes but I don't know How can i use it for Encoder of Dataset. So trying with this. –  Sep 24 '20 at 11:44
  • Why can't you call directly `new Load[schema.Users](tableName,inputPath,spark, saveMode, outputPath + tableName, metadata).Load` with static type `schema.Users` without dynamic string `"com.example.datasources.fileSystemSource.schema.Users"`? – Dmytro Mitin Sep 24 '20 at 11:45
  • 1
    @DmytroMitin that is the whole issue, number of tables in source database are changing rapidly and thus I don't want to edit case classes so I dynamically generate them through SCALA code and put in package. But now not able to read it dynamically. If this works than I would parse "com.example.datasources.fileSystemSource.schema.{}" as object schema members in loop. –  Sep 24 '20 at 11:48
  • 1
    `typetagDynamic.type` can't be correct. `new Foo[Bar](...)` can be used if you know `Foo` and `Bar` statically (at compile time). Otherwise you'll have to create an instance using reflection. – Dmytro Mitin Sep 24 '20 at 11:51
  • @DmytroMitin can you suggest something how to Otherwise you'll have to create an instance using reflection.? like this you mean but its not working val typetagDynamic =stringToTypeTag("com.example.datasources.fileSystemSource.schema.Users") val classSymbol = mirror.classSymbol(typetagDynamic) val classType = classSymbol.toType typetagDynamic.newInstance.asInstanceOf(classType) –  Sep 24 '20 at 12:00
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/222006/discussion-between-vermaabhishek-and-dmytro-mitin). –  Sep 24 '20 at 12:05
  • 1
    Generating code to access it "dynamically" through reflection appears to be quite pointless to me. Why not access the DB "dynamically"? If you're not able to use the generated identifiers of your code for programming, then code generation makes no sense. – ziggystar Sep 24 '20 at 12:52
  • 1
    If you really need something like a dynamic repository, I'd use the (dynamic) DataFrame API. On the other hand, a static generator would have the additional benefit of being able to code some parts using the Dataset API, and generated case classes are useful for providing test data for example. So I'd rather trigger the build pipeline including the static generator when the repository changes. – Beryllium Jun 09 '22 at 09:20

1 Answers1

0

If you know a type schema.Users only at runtime try to replace

new Load[schema.Users](tableName,inputPath,spark,
  saveMode,
  outputPath + tableName,
  metadata).Load

with

import scala.reflect.runtime
import scala.reflect.runtime.universe._

val currentMirror = runtime.currentMirror

val loadType = typeOf[Load[_]]
val classSymbol = loadType.typeSymbol.asClass
val classMirror = currentMirror.reflectClass(classSymbol)
val constructorSymbol = loadType.decl(termNames.CONSTRUCTOR).asMethod
val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
  
import scala.tools.reflect.ToolBox
val toolbox = ToolBox(currentMirror).mkToolBox()
val encoderType = appliedType(
  typeOf[Encoder[_]].typeConstructor.typeSymbol,
  currentMirror.staticClass("com.example.datasources.fileSystemSource.schema.Users").toType
)
val encoderTree = toolbox.inferImplicitValue(encoderType, silent = false)
val encoderInstance = toolbox.eval(toolbox.untypecheck(encoderTree))

constructorMirror(tableName,inputPath,spark,
  saveMode,
  outputPath + tableName,
  metadata, encoderInstance).asInstanceOf[Load[_]].Load

scala.tools.reflect.ToolBoxError: implicit search has failed

You need either:

  1. to define an instance of type class org.apache.spark.sql.Encoder for Users in its companion object (so that the instance will be in implicit scope)

    object Users {
      implicit val usersEnc: Encoder[Users] = spark.implicits.newProductEncoder[Users]
    }
    

or

  1. to import instances of Encoder for case classes via import spark.implicits._ but you need to import them not into current local scope but into toolbox-generated local scope, so in this case you should replace

    val encoderTree = toolbox.inferImplicitValue(encoderType, silent = false)
    val encoderInstance = toolbox.eval(toolbox.untypecheck(encoderTree))
    

    with

    val className = "com.example.datasources.fileSystemSource.schema.Users"
    val classType = currentMirror.staticClass(className).toType
    val encoderInstance = toolbox.eval(
      q"""import path.to.spark.implicits._
          import org.apache.spark.sql.Encoder
          implicitly[Encoder[$classType]]""")
    

See the whole code: https://gist.github.com/DmytroMitin/2cad52c27f5360ae9b1e7503d6f6cd00

https://groups.google.com/g/scala-internals/c/ta-vbUT6JE8

Dmytro Mitin
  • 48,194
  • 3
  • 28
  • 66
  • After debugging alot I am getting following error. In below code. I think somehow tree got corrupted val encoderTree = toolbox.inferImplicitValue(encoderType, silent = false) val encoderInstance: Any = toolbox.eval(toolbox.untypecheck(encoderTree)) scala.tools.reflect.ToolBoxError: implicit search has failed. –  Sep 29 '20 at 21:59
  • @VermaAbhishek It means compiler can't find implicit `Encoder[Users]`. Did you define or import some? – Dmytro Mitin Sep 29 '20 at 22:07
  • it is showing same error as before even if i load like below: import com.example.datasources.fileSystemSource.schema.Users println(Users.getClass.getName.replace("$","") //com.example.datasources.fileSystemSource.schema.Users println(ScalaClassLoader(getClass.getClassLoader).tryToLoadClass(Users.getClass.getName.replace("$",""))) // Some(class com.example.datasources.fileSystemSource.schema.Users) val encoderType = appliedType( typeOf[Encoder[_]].typeConstructor.typeSymbol, currentMirror.staticClass(Users.getClass.getName.replace("$","").toType ) –  Oct 01 '20 at 07:54