I am using Spark 1.5 CDH5.5
My requirement is I need to read each optional column from a dataframe and check if the length of that column is greater than the maxLimit, if it is greater that maxLimit then i need to apply a substring on that column
if the value of description is greater than 33 then i need to apply substring(0,33) If the value of emr_id is greater than 34 then i need to apply substring(0,34)
To achieve this , I need to pass some extra string arguments to a spark udf, It throws the below error
I am calling a udf for each column as below
Please help me on what went wrong?
scala> updatedDF.printSchema
root
|-- data_id: string (nullable = true)
|-- description: string (nullable = true)
|-- emr_id: string (nullable = true)
val METADATA_ATTRIBUTES_SIZE_SCHEMA = List(
("data_id","data_id","string","mandatory","40"),
("description","description","string","optional","33"),
("emr_id","emr_id","string","optional","34")
)
updatedDF = updatedDF.formatMetaData(METADATA_ATTRIBUTES_SIZE_SCHEMA)
I have the below trait which has lots of methods that can be directly invoked by any dataframe
trait MetaDataOperations {
val df: DataFrame
def formatMetaData(schema: List [(String,String,String,String,String)]) :DataFrame = {
var formattedDF = df
val getFormattedMetaData= udf(getFormattedMetaDataUdf _)
for(column <- formattedDF.columns){
val schemaList = schema.filter(tuple => tuple._1.equals(column))
val attributeDataType = schemaList.head._3.toString
val attributeConformance = schemaList.head._4.toString
val attributeMaxSize = schemaList.head._5.toString
formattedDF = formattedDF.withColumn(column, getFormattedMetaData(formattedDF(column),lit(attributeDataType),lit(attributeConformance),lit(attributeMaxSize)))
}
formattedDF
}
def getFormattedMetaDataUdf (colValue:String,attributeDataType:String,attributeConformance:String,attributeMaxSize:String): String = {
var maxLength = 0
var formattedColValue = attributeConformance match {
case "optional" => {
if (attributeDataType == "string") {
maxLength = attributeMaxSize.toInt
}
if (colValue.length > maxLength) {
colValue.substring(0, maxLength) + "..."
}
else {
colValue
}
}
case _ => colValue
}
formattedColValue
}
}
I am getting the below error
java.lang.ClassCastException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 cannot be cast to scala.Function4
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:107)
at org.apache.spark.sql.UserDefinedFunction.apply(UserDefinedFunction.scala:50)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:55)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:78)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:80)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:82)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:84)
at $iwC$$iwC$$iwC.<init>(<console>:86)
at $iwC$$iwC.<init>(<console>:88)
at $iwC.<init>(<console>:90)
at <init>(<console>:92)
at .<init>(<console>:96)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)