1

I tried getting the hint from this question but it seems it is not of help to me.

I am getting " Unable to generate an encoder for inner class " in my main class. I have created another class where i am generating my dataframe. I am using output of which is dataframe to create a val in my main class. It is compiling fine but giving me this error.

Moreover, when i keep all the FT DataParser code in main it works. It is something subtle which i am missing. I am newbie in Spark so if it too silly of a question please excuse me.

Exception in thread "main" org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `com.DC.FTDataProject.exchangeParameters$Scrip` without access to the scope that this class was defined in.
Try moving this class out of its parent class.;

My Main class is

object MainApp extends App with Serializable  {

  @transient lazy val log = org.apache.log4j.LogManager.getLogger("ScalaSparkLogger")



  val spark: SparkSession = SparkSession.builder
    .appName("FTDataProject")
    .config("spark.some.config.option", "some-value")
    .master("local")
    .getOrCreate()

  log.info("Start of Main Programme")

  val currency = new YahooCurrencyLoader() with CurrencyParameters
  val ccy = currency.getXML(currency.ccyUrl) match {

    case Success(v) => XML.save("PreviousRun.xml",v); log.info("XML has been saved for use")
    case Failure(ex) => log.error("XML extraction failed. Look at Yahoo extraction class" + ex.getMessage )

  }


  val dp = new FTDataParser() with exchangeParameters

  val checkDF: DataFrame = dp.sparkDfCreation("Europe","22022017","Annual",spark)
  checkDF.filter(col("Symbol").equalTo("HSBA:LSE")).show()

}

My FTDataparser class is

def sparkDfCreation(d : String, p : String, a : String, s : org.apache.spark.sql.SparkSession) : org.apache.spark.sql.DataFrame = {

    val currency = new YahooCurrencyLoader() with CurrencyParameters


    val xmllocation: String = "./PreviousRun.xml"
    val loadxml: Elem = XML.loadFile(xmllocation)
    //print(loadxml)
    //print(currency.findCurrency(loadxml,"GBP"))

    log.info("USD CAD Cross is " + currency.findCurrency(loadxml,"CAD").head)

    val fn =  fileHandler(d : String, p : String, a : String)
    log.info("The filename is " + fn)

    val df = s.read.option("delimiter" , "|").option("header","true").option("inferSchema","true").csv(fn)

    log.info("The print schema is " + df.printSchema())

    import s.implicits._


    //val newdf = df.withColumn("TradeCCY", parseString($"PriceCCY"))
    //val newDF = df.withColumn("CAvgVolume",udfStringtoNumber($"AvgVolume"))

    //val a = df.columns.toList


    val checkDF = df.rdd.map { x =>
              val Close: Double = parseDouble(x(2)).getOrElse(999999)
              val Open: Double = parseDouble(x(5)).getOrElse(999999)
              val High: Double = parseDouble(x(6)).getOrElse(999999)
              val Low: Double = parseDouble(x(7)).getOrElse(999999)
              val PreviousClose: Double = parseDouble(x(8)).getOrElse(999999)
              val priceccy: String = spc(x(3))
              val realpriceccymul: Int = 1 / currency_multiplier_map(exchange_to_real_ccy_map.getOrElse(priceccy,priceccy))
              val currencyCon : Double = currency.currencyCon(loadxml,exchange_to_real_ccy_map.getOrElse(priceccy,priceccy),"GBP")
              val repccy: String = repccyFinder(spc(x(30)),spc(x(71)),spc(x(120)))
              val reppriceccymul : Int = 1 / currency_multiplier_map(exchange_to_real_ccy_map.getOrElse(repccy,repccy))
              val repcurrencyCon : Double = currency.currencyCon(loadxml, exchange_to_real_ccy_map.getOrElse(priceccy,priceccy), repccy)

              Scrip(
                stringParser(x(0)) + ":" + d + ":" + p,
                d,
                stringParser(x(0)),
                stringParser(x(1)),
                priceccy,
                stringParser(x(28)),
                stringParser(x(29)),
                parseDouble(x(4)).getOrElse(999999), // 999999 means Beta is missing.
                Close,
                Open,
                High,
                Low,
                PreviousClose,
                Close * realpriceccymul * currencyCon,
                Open * realpriceccymul * currencyCon,
                High * realpriceccymul * currencyCon,
                Low * realpriceccymul * currencyCon,
                PreviousClose * reppriceccymul * currencyCon,
                Close * reppriceccymul * repcurrencyCon,
                Open * reppriceccymul * repcurrencyCon,
                High * reppriceccymul * repcurrencyCon,
                Low * reppriceccymul * repcurrencyCon,
                PreviousClose * realpriceccymul * repcurrencyCon,
                currency.findCurrency(loadxml,"GBP").head,
                currency.findCurrency(loadxml,"AUD").head,
                currency.findCurrency(loadxml,"EUR").head,
                currency.findCurrency(loadxml,"INR").head,
                currency.findCurrency(loadxml,"JPY").head,
                currency.findCurrency(loadxml,"CAD").head,
                parseDouble(x(9)).getOrElse(999999),
                x(11).toString,
                parseDouble(x(10)).getOrElse(999999),
                x(12).toString,
                stringConvertor(x(13)),
                stringConvertor(x(14)),
                stringConvertor(x(15)),
                stringConvertor(x(17)),
                x(18).toString.trim,
                parseDouble(x(19)).getOrElse(999999), // 999999 means EPS is missing.
                x(20).toString.trim,
                parseDouble(x(21)).getOrElse(999999), // 999999 means Annual Divi is missing.
                parseDouble(stringConvertor(x(23))).getOrElse(999999), // 999999 means Annual Divi yield is missing.
                x(22).toString
      )
    }.toDF()


  return checkDF


  }

Edit :

When i changed the signature of the sparkDfCreation def sparkDfCreation(a : String, d : String, p : String, s : org.apache.spark.sql.SparkSession) : RDD[Scrip] and then in the main i added these 2 lines and it seems to worked.

  val checkRDD = dp.sparkDfCreation("Europe","22022017","Annual",spark)
  //checkDF.filter(col("Symbol").equalTo("HSBA:LSE")).show()

  val checkDF = spark.sqlContext.createDataFrame(checkRDD)
  checkDF.filter(col("Symbol").equalTo("HSBA:LSE")).show()
}                   

But it seems it is more verbose. Isn't it a bug ?

Community
  • 1
  • 1
user3341078
  • 449
  • 1
  • 5
  • 16
  • Is `Scrip` a case class? Because there seems to be a lot of fields for a case class in Scala < 2.12. – BenFradet Mar 02 '17 at 09:52
  • @BenFradet Yes Scrip is a case class. It seems when scala moved from 2.10 to 2.11 the 22 limit restriction on case class was removed though it still exists for tuples.More information [here](http://underscore.io/blog/posts/2016/10/11/twenty-two.html) – user3341078 Mar 02 '17 at 14:05
  • Yup, but since spark is not built against 2.12, I think there might be issues with the provided encoders for case classes. I might be wrong though. Have you tried using a pojo and `Encoders.bean`? – BenFradet Mar 02 '17 at 14:38
  • Try the second answer on this question https://stackoverflow.com/questions/34964565/how-to-create-a-dataset-from-custom-class-person – rajesh-nitc Jul 17 '17 at 07:44

0 Answers0