0

I am using Scala interpreter to evaluate Scala statements, coming from configuration.

Sample code is:

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.IMain

object BSFTest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName("TEST")
      .setMaster("local") // spark://127.0.0.1:7077

    val sparkSession = SparkSession.builder()
      .appName("TEST")
      .config(sparkConf)
      .enableHiveSupport()
      .getOrCreate()

    import sparkSession.sql

    sql ("DROP DATABASE IF EXISTS test CASCADE")
    sql(s"CREATE DATABASE test")

    sql ("CREATE TABLE test.box_width (id INT, width INT)")
    sql ("INSERT INTO test.box_width VALUES (1,1), (2,2)")

    sql ("CREATE TABLE test.box_length (id INT, length INT)")
    sql ("INSERT INTO test.box_length VALUES (1,10), (2,20)")

    val widthDF:DataFrame = sql("select *  from  test.box_width")
    val lengthDF = sql("select *  from  test.box_length")

    val settings = new Settings
    settings.usejavacp.value = true
    settings.deprecation.value = true
    settings.embeddedDefaults(this.getClass().getClassLoader())

    val eval = new IMain(settings)
    eval.bind("lengthDF", "org.apache.spark.sql.DataFrame", lengthDF)
    eval.bind("widthDF", "org.apache.spark.sql.DataFrame", widthDF)
    val clazz1 = "lengthDF.join(widthDF, \"id\")" //STATEMENT FROM CONFIGURATION 
    val evaluated = eval.interpret(clazz1)
    val res = eval.valueOfTerm("res0").get.asInstanceOf[DataFrame]
    println("PRINT SCHEMA: " + res.schema) //This statement is running fine
    res.show() //EXCEPTION HERE
  }
}

I am getting following error when executing code:

lengthDF: org.apache.spark.sql.DataFrame = [id: int, length: int]
widthDF: org.apache.spark.sql.DataFrame = [id: int, width: int]
res0: org.apache.spark.sql.DataFrame = [id: int, length: int ... 1 more field]
PRINT SCHEMA: StructType(StructField(id,IntegerType,true), StructField(length,IntegerType,true), StructField(width,IntegerType,true))
18/10/24 15:08:14 ERROR CodeGenerator: failed to compile: org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": Class 'org.apache.spark.sql.catalyst.expressions.codegen.GeneratedClass' was loaded through a different loader
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificSafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificSafeProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
/*....
Caused by: org.codehaus.janino.InternalCompilerException: Class 'org.apache.spark.sql.catalyst.expressions.codegen.GeneratedClass' was loaded through a different loader
    at org.codehaus.janino.SimpleCompiler$2.getDelegate(SimpleCompiler.java:410)
    at org.codehaus.janino.SimpleCompiler$2.accept(SimpleCompiler.java:353)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6130)

FULL LOG

I am not able to understand that even when res.schema (fetching schema from DataFrame) is running as expected, res.show (retrieve data and print from DataFrame) is throwing exception

Version:

scalaVersion := "2.11.11"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.2"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.2.2"

What can I do to solve the issue?

user811602
  • 1,314
  • 2
  • 17
  • 47

1 Answers1

0

I have solved the issue, taking reference of https://stackoverflow.com/a/6164608/811602

Now I am creating and loading class at runtime: Here is working code

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.IMain

import java.util.concurrent.atomic.AtomicInteger

object DynamicClassLoader {

  val offset = new AtomicInteger()

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName("TEST")
      .setMaster("local") // spark://127.0.0.1:7077

    val sparkSession = SparkSession.builder()
      .appName("TEST")
      .config(sparkConf)
      .enableHiveSupport()
      .getOrCreate()

    import sparkSession.sql

    sql ("DROP DATABASE IF EXISTS test CASCADE")
    sql(s"CREATE DATABASE test")

    sql ("CREATE TABLE test.box_width (id INT, width INT)")
    sql ("INSERT INTO test.box_width VALUES (1,1), (2,2)")

    sql ("CREATE TABLE test.box_length (id INT, length INT)")
    sql ("INSERT INTO test.box_length VALUES (1,10), (2,20)")

    val widthDF = sql("select *  from  test.box_width")
    val lengthDF = sql("select *  from  test.box_length")

    var udfclassName:String = "AClass" + offset.getAndIncrement()
    var statements = """
        | val result = input1.join(input2, "id")
        | return result
        | """.stripMargin
    val srcA = """
      | class """.stripMargin + udfclassName + """ extends SomeTrait {
      |   import org.apache.spark.sql.DataFrame
      |     def someMethod(input1:DataFrame, input2: DataFrame): DataFrame = {
      |     """.stripMargin +
      statements +
      """}
      | }
      """.stripMargin

    val settings = new Settings
    settings.usejavacp.value = true
    settings.deprecation.value = true
    settings.embeddedDefaults(this.getClass().getClassLoader())

    val eval = new IMain(settings)
    eval.compileString(srcA)
    val classA = eval.classLoader.loadClass(udfclassName)
    eval.close()

    val objA = classA.newInstance().asInstanceOf[SomeTrait]
    val resultDF = objA.someMethod(lengthDF, widthDF)
    println(resultDF.schema)
    resultDF.show()
  }
}

trait SomeTrait { def someMethod(input1:DataFrame, input2: DataFrame): DataFrame}

Though I am not blocked because of posted question and figured out alternate way to achieve same, question is still open as root cause for exception is still open to find out and solve

user811602
  • 1,314
  • 2
  • 17
  • 47