5

I'm running Scala Spark job using Zeppelin. When i run it i get below error:

latestForEachKey: org.apache.spark.sql.DataFrame = [PartitionStatement_1: string, PartitionYear_1: string ... 64 more fields]
<console>:393: error: Could not write class $$$$2e6199f161363585e7ae9b28bcf8535e$$$$iw because it exceeds JVM code size limits. Method <init>'s code too large!
class $iw extends Serializable {

Sometimes I'm not getting the error and it works. What can I do to fix this problem?

Here is the code that I run:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import java.io.File
import org.apache.hadoop.fs._
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

val getPartition = spark.udf.register("getPartition", (filePath: String) => filePath.split("\\.")(3))
val getYearAndStatementTypeCodePartition = spark.udf.register("getPartition", (filePath: String) => filePath.split("\\.")(4))
val get_partition_Year = spark.udf.register("get_partition_Year", (df1resultFinal: String) => df1resultFinal.split("-")(0))
val get_partition_Statement = spark.udf.register("get_partition_Year", (df1resultFinal: String) => df1resultFinal.split("-")(1))

val rdd = sc.textFile("s3://trfsmallfffile/FinancialStatementLineItem/MAIN")
val header = rdd.filter(_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

val df1resultFinal=data.withColumn("DataPartition", getPartition(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_partition_Year(getYearAndStatementTypeCodePartition(input_file_name)))
val df1resultFinalWithAllPartition=df1resultFinalWithYear.withColumn("PartitionStatement", get_partition_Statement(getYearAndStatementTypeCodePartition(input_file_name)))

val df1resultFinalwithTimestamp=df1resultFinalWithAllPartition
.withColumn("CapitalChangeAdjustmentDate",date_format(col("CapitalChangeAdjustmentDate"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("FinancialStatementLineItemValue", regexp_replace(format_number($"FinancialStatementLineItemValue".cast(DoubleType), 5), ",", ""))
.withColumn("AdjustedForCorporateActionValue", regexp_replace(format_number($"AdjustedForCorporateActionValue".cast(DoubleType), 5), ",", ""))
.withColumn("IsAsReportedCurrencySetManually", regexp_replace(format_number($"IsAsReportedCurrencySetManually".cast(DoubleType), 5), ",", ""))
.withColumn("ItemDisplayedValue", regexp_replace(format_number($"ItemDisplayedValue".cast(DoubleType), 5), ",", ""))
.withColumn("ReportedValue", regexp_replace(format_number($"ReportedValue".cast(DoubleType), 5), ",", ""))
.withColumn("AsReportedExchangeRate", regexp_replace(format_number($"AsReportedExchangeRate".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange".cast(DoubleType), 5), ",", ""))

//Loading Incremental 

val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialStatementLineItem/INCR")
val header1 = rdd1.filter(_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)

val df2resultTimestamp=data1
.withColumn("CapitalChangeAdjustmentDate_1",date_format(col("CapitalChangeAdjustmentDate_1"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("FinancialStatementLineItemValue_1", regexp_replace(format_number($"FinancialStatementLineItemValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("AdjustedForCorporateActionValue_1", regexp_replace(format_number($"AdjustedForCorporateActionValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("IsAsReportedCurrencySetManually_1", regexp_replace(format_number($"IsAsReportedCurrencySetManually_1".cast(DoubleType), 5), ",", ""))
.withColumn("ItemDisplayedValue_1", regexp_replace(format_number($"ItemDisplayedValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("ReportedValue_1", regexp_replace(format_number($"ReportedValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("AsReportedExchangeRate_1", regexp_replace(format_number($"AsReportedExchangeRate_1".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange_1", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange_1".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange_1", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange_1".cast(DoubleType), 5), ",", ""))


import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("FinancialStatementLineItem_lineItemId", "PeriodId","SourceId","StatementTypeCode","StatementCurrencyId","uniqueFundamentalSet").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = df2resultTimestamp.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")


val dfMainOutput = df1resultFinalwithTimestamp.join(latestForEachKey, Seq("FinancialStatementLineItem_lineItemId", "PeriodId","SourceId","StatementTypeCode","StatementCurrencyId","uniqueFundamentalSet"), "outer")
      .select($"uniqueFundamentalSet",$"PeriodId",$"SourceId",$"StatementTypeCode",$"StatementCurrencyId",$"FinancialStatementLineItem_lineItemId",
        when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
        when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear").as("PartitionYear"),
        when($"PartitionStatement_1".isNotNull, $"PartitionStatement_1").otherwise($"PartitionStatement").as("PartitionStatement"),
        when($"FinancialAsReportedLineItemName_1".isNotNull, $"FinancialAsReportedLineItemName_1").otherwise($"FinancialAsReportedLineItemName").as("FinancialAsReportedLineItemName"),
        when($"FinancialAsReportedLineItemName_languageId_1".isNotNull, $"FinancialAsReportedLineItemName_languageId_1").otherwise($"FinancialAsReportedLineItemName_languageId").as("FinancialAsReportedLineItemName_languageId"),
        when($"FinancialStatementLineItemValue_1".isNotNull, $"FinancialStatementLineItemValue_1").otherwise($"FinancialStatementLineItemValue").as("FinancialStatementLineItemValue"),
        when($"AdjustedForCorporateActionValue_1".isNotNull, $"AdjustedForCorporateActionValue_1").otherwise($"AdjustedForCorporateActionValue").as("AdjustedForCorporateActionValue"),
        when($"ReportedCurrencyId_1".isNotNull, $"ReportedCurrencyId_1").otherwise($"ReportedCurrencyId").as("ReportedCurrencyId"),
        when($"IsAsReportedCurrencySetManually_1".isNotNull, $"IsAsReportedCurrencySetManually_1").otherwise($"IsAsReportedCurrencySetManually").as("IsAsReportedCurrencySetManually"),
        when($"Unit_1".isNotNull, $"Unit_1").otherwise($"Unit").as("Unit"),
        when($"IsTotal_1".isNotNull, $"IsTotal_1").otherwise($"IsTotal").as("IsTotal"),
        when($"StatementSectionCode_1".isNotNull, $"StatementSectionCode_1").otherwise($"StatementSectionCode").as("StatementSectionCode"),
        when($"DimentionalLineItemId_1".isNotNull, $"DimentionalLineItemId_1").otherwise($"DimentionalLineItemId").as("DimentionalLineItemId"),
        when($"IsDerived_1".isNotNull, $"IsDerived_1").otherwise($"IsDerived").as("IsDerived"),
        when($"EstimateMethodCode_1".isNotNull, $"EstimateMethodCode_1").otherwise($"EstimateMethodCode").as("EstimateMethodCode"),
        when($"EstimateMethodNote_1".isNotNull, $"EstimateMethodNote_1").otherwise($"EstimateMethodNote").as("EstimateMethodNote"),
        when($"EstimateMethodNote_languageId_1".isNotNull, $"EstimateMethodNote_languageId_1").otherwise($"EstimateMethodNote_languageId").as("EstimateMethodNote_languageId"),
        when($"FinancialLineItemSource_1".isNotNull, $"FinancialLineItemSource_1").otherwise($"FinancialLineItemSource").as("FinancialLineItemSource"),
        when($"IsCombinedItem_1".isNotNull, $"IsCombinedItem_1").otherwise($"IsCombinedItem").as("IsCombinedItem"),
        when($"IsExcludedFromStandardization_1".isNotNull, $"IsExcludedFromStandardization_1").otherwise($"IsExcludedFromStandardization").as("IsExcludedFromStandardization"),
        when($"DocByteOffset_1".isNotNull, $"DocByteOffset_1").otherwise($"DocByteOffset").as("DocByteOffset"),
        when($"DocByteLength_1".isNotNull, $"DocByteLength_1").otherwise($"DocByteLength").as("DocByteLength"),
        when($"BookMark_1".isNotNull, $"BookMark_1").otherwise($"BookMark").as("BookMark"),
        when($"ItemDisplayedNegativeFlag_1".isNotNull, $"ItemDisplayedNegativeFlag_1").otherwise($"ItemDisplayedNegativeFlag").as("ItemDisplayedNegativeFlag"),
        when($"ItemScalingFactor_1".isNotNull, $"ItemScalingFactor_1").otherwise($"ItemScalingFactor").as("ItemScalingFactor"),
        when($"ItemDisplayedValue_1".isNotNull, $"ItemDisplayedValue_1").otherwise($"ItemDisplayedValue").as("ItemDisplayedValue"),
        when($"ReportedValue_1".isNotNull, $"ReportedValue_1").otherwise($"ReportedValue").as("ReportedValue"),
        when($"EditedDescription_1".isNotNull, $"EditedDescription_1").otherwise($"EditedDescription").as("EditedDescription"),
        when($"EditedDescription_languageId_1".isNotNull, $"EditedDescription_languageId_1").otherwise($"EditedDescription_languageId").as("EditedDescription_languageId"),
        when($"ReportedDescription_1".isNotNull, $"ReportedDescription_1").otherwise($"ReportedDescription").as("ReportedDescription"),
        when($"ReportedDescription_languageId_1".isNotNull, $"ReportedDescription_languageId_1").otherwise($"ReportedDescription_languageId").as("ReportedDescription_languageId"),
        when($"AsReportedInstanceSequence_1".isNotNull, $"AsReportedInstanceSequence_1").otherwise($"AsReportedInstanceSequence").as("AsReportedInstanceSequence"),
        when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
        when($"FinancialStatementLineItemSequence_1".isNotNull, $"FinancialStatementLineItemSequence_1").otherwise($"FinancialStatementLineItemSequence").as("FinancialStatementLineItemSequence"),
        when($"SystemDerivedTypeCode_1".isNotNull, $"SystemDerivedTypeCode_1").otherwise($"SystemDerivedTypeCode").as("SystemDerivedTypeCode"),
        when($"AsReportedExchangeRate_1".isNotNull, $"AsReportedExchangeRate_1").otherwise($"AsReportedExchangeRate").as("AsReportedExchangeRate"),
        when($"AsReportedExchangeRateSourceCurrencyId_1".isNotNull, $"AsReportedExchangeRateSourceCurrencyId_1").otherwise($"AsReportedExchangeRateSourceCurrencyId").as("AsReportedExchangeRateSourceCurrencyId"),
        when($"ThirdPartySourceCode_1".isNotNull, $"ThirdPartySourceCode_1").otherwise($"ThirdPartySourceCode").as("ThirdPartySourceCode"),
        when($"FinancialStatementLineItemValueUpperRange_1".isNotNull, $"FinancialStatementLineItemValueUpperRange_1").otherwise($"FinancialStatementLineItemValueUpperRange").as("FinancialStatementLineItemValueUpperRange"),
        when($"FinancialStatementLineItemLocalLanguageLabel_1".isNotNull, $"FinancialStatementLineItemLocalLanguageLabel_1").otherwise($"FinancialStatementLineItemLocalLanguageLabel").as("FinancialStatementLineItemLocalLanguageLabel"),
        when($"FinancialStatementLineItemLocalLanguageLabel_languageId_1".isNotNull, $"FinancialStatementLineItemLocalLanguageLabel_languageId_1").otherwise($"FinancialStatementLineItemLocalLanguageLabel_languageId").as("FinancialStatementLineItemLocalLanguageLabel_languageId"),
        when($"IsFinal_1".isNotNull, $"IsFinal_1").otherwise($"IsFinal").as("IsFinal"),
        when($"FinancialStatementLineItem_lineItemInstanceKey_1".isNotNull, $"FinancialStatementLineItem_lineItemInstanceKey_1").otherwise($"FinancialStatementLineItem_lineItemInstanceKey").as("FinancialStatementLineItem_lineItemInstanceKey"),
        when($"StatementSectionIsCredit_1".isNotNull, $"StatementSectionIsCredit_1").otherwise($"StatementSectionIsCredit").as("StatementSectionIsCredit"),
        when($"CapitalChangeAdjustmentDate_1".isNotNull, $"CapitalChangeAdjustmentDate_1").otherwise($"CapitalChangeAdjustmentDate").as("CapitalChangeAdjustmentDate"),
        when($"ParentLineItemId_1".isNotNull, $"ParentLineItemId_1").otherwise($"ParentLineItemId").as("ParentLineItemId"),
        when($"EstimateMethodId_1".isNotNull, $"EstimateMethodId_1").otherwise($"EstimateMethodId").as("EstimateMethodId"),
        when($"StatementSectionId_1".isNotNull, $"StatementSectionId_1").otherwise($"StatementSectionId").as("StatementSectionId"),
        when($"SystemDerivedTypeCodeId_1".isNotNull, $"SystemDerivedTypeCodeId_1").otherwise($"SystemDerivedTypeCodeId").as("SystemDerivedTypeCodeId"),
        when($"UnitEnumerationId_1".isNotNull, $"UnitEnumerationId_1").otherwise($"UnitEnumerationId").as("UnitEnumerationId"),
        when($"FiscalYear_1".isNotNull, $"FiscalYear_1").otherwise($"FiscalYear").as("FiscalYear"),
        when($"IsAnnual_1".isNotNull, $"IsAnnual_1").otherwise($"IsAnnual").as("IsAnnual"),
        when($"PeriodPermId_1".isNotNull, $"PeriodPermId_1").otherwise($"PeriodPermId").as("PeriodPermId"),
        when($"PeriodPermId_objectTypeId_1".isNotNull, $"PeriodPermId_objectTypeId_1").otherwise($"PeriodPermId_objectTypeId").as("PeriodPermId_objectTypeId"),
        when($"PeriodPermId_objectType_1".isNotNull, $"PeriodPermId_objectType_1").otherwise($"PeriodPermId_objectType").as("PeriodPermId_objectType"),
        when($"AuditID_1".isNotNull, $"AuditID_1").otherwise($"AuditID").as("AuditID"),
        when($"AsReportedItemId_1".isNotNull, $"AsReportedItemId_1").otherwise($"AsReportedItemId").as("AsReportedItemId"),
        when($"ExpressionInstanceId_1".isNotNull, $"ExpressionInstanceId_1").otherwise($"ExpressionInstanceId").as("ExpressionInstanceId"),
        when($"ExpressionText_1".isNotNull, $"ExpressionText_1").otherwise($"ExpressionText").as("ExpressionText"),
         when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
        .filter(!$"FFAction|!|".contains("D|!|"))

val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"PartitionYear", $"PartitionStatement",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").filter(_ != "PartitionStatement").map(c => col(c)): _*).as("concatenated"))

val headerColumn = dataHeader.columns.toSeq

val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", header)



dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","PartitionYear","PartitionStatement")
  .format("csv")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsmallfffile/FinancialStatementLineItem/output")


  val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","PartitionYear","PartitionStatement").count

  FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
  .option("rootTag", "FFFileType")
  .option("rowTag", "FFPhysicalFile")
   .save("s3://trffullfiles/FinancialStatementLineItem/Descr")

The script is very large but I dont know how can I make it smaller. Maybe by using Scala classes or something?

zero323
  • 322,348
  • 103
  • 959
  • 935
Atharv Thakur
  • 671
  • 3
  • 21
  • 39

2 Answers2

5

The maximum size of a method in Java (and in extension Scala) is 64KB of bytecode, see e.g. the question here. That means that you ahve too much code without splitting it up into multiple methods.

In your case, I would recommend the following:

  1. Refactor parts of the code to be more consice. Especially when computing dfMainOutput there are a lot of when statements, it should be possible to do this in a more efficient and better looking way.

  2. Since the size constraint is per method, you can simply move some of the code into their own methods. This has the added benifit of being easier to follow and read. For example, you can have a method called loadData() which reads the database that returns a dataframe and another method for merging the df1resultFinalwithTimestamp with latestForEachKey. You can create methods for each section/part of the code.

Shaido
  • 27,497
  • 23
  • 70
  • 73
1

There are several workarounds for this problem. To make the method smaller, you need to extract some things into separate methods. Therefore replace some

val x = (huge expression)

with either:

def calculate_x = (huge expression)
val x = calculate_x

or

val x = (() => (huge expression))()

or even

lazy val x = (huge expression)

Pick the one that won't break or unnecessarily slow down your app. In your case, I'd just make dfMainOutput a lazy val and call it a day.

Karol S
  • 9,028
  • 2
  • 32
  • 45