0

I am using Spark 1.5 CDH5.5

My requirement is I need to read each optional column from a dataframe and check if the length of that column is greater than the maxLimit, if it is greater that maxLimit then i need to apply a substring on that column

if the value of description is greater than 33 then i need to apply substring(0,33) If the value of emr_id is greater than 34 then i need to apply substring(0,34)

To achieve this , I need to pass some extra string arguments to a spark udf, It throws the below error

I am calling a udf for each column as below

Please help me on what went wrong?

 scala> updatedDF.printSchema
root
 |-- data_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- emr_id: string (nullable = true)


val METADATA_ATTRIBUTES_SIZE_SCHEMA =  List(
("data_id","data_id","string","mandatory","40"),
("description","description","string","optional","33"),
("emr_id","emr_id","string","optional","34")
)


 updatedDF = updatedDF.formatMetaData(METADATA_ATTRIBUTES_SIZE_SCHEMA)

I have the below trait which has lots of methods that can be directly invoked by any dataframe

trait MetaDataOperations {

val df: DataFrame


 def formatMetaData(schema: List  [(String,String,String,String,String)]) :DataFrame = {
var formattedDF = df
val getFormattedMetaData= udf(getFormattedMetaDataUdf _)
for(column <- formattedDF.columns){
  val schemaList = schema.filter(tuple => tuple._1.equals(column))
  val attributeDataType = schemaList.head._3.toString
  val attributeConformance = schemaList.head._4.toString
  val attributeMaxSize = schemaList.head._5.toString
  formattedDF = formattedDF.withColumn(column, getFormattedMetaData(formattedDF(column),lit(attributeDataType),lit(attributeConformance),lit(attributeMaxSize)))
}
formattedDF
 }



  def getFormattedMetaDataUdf (colValue:String,attributeDataType:String,attributeConformance:String,attributeMaxSize:String): String = {
   var maxLength = 0
   var formattedColValue = attributeConformance match {
   case "optional" => {
    if (attributeDataType == "string") {
      maxLength = attributeMaxSize.toInt
    }

    if (colValue.length > maxLength) {
      colValue.substring(0, maxLength) + "..."
    }
    else {
      colValue
    }
  }
  case _ => colValue
}
formattedColValue
}


 }

I am getting the below error

java.lang.ClassCastException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 cannot be cast to scala.Function4
    at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:107)
    at org.apache.spark.sql.UserDefinedFunction.apply(UserDefinedFunction.scala:50)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:55)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:78)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:80)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:82)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:84)
    at $iwC$$iwC$$iwC.<init>(<console>:86)
    at $iwC$$iwC.<init>(<console>:88)
    at $iwC.<init>(<console>:90)
    at <init>(<console>:92)
    at .<init>(<console>:96)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
Surender Raja
  • 3,553
  • 8
  • 44
  • 80
  • Possible duplicate of [How to pass the parameter to User-Defined Function?](https://stackoverflow.com/questions/47260999/how-to-pass-the-parameter-to-user-defined-function) – Jacek Laskowski Nov 13 '17 at 14:58
  • @Jacek : I applied lit() before I pass them to udf, I don't know why I get this error . – Surender Raja Nov 13 '17 at 15:11

1 Answers1

0

getFormattedMetaDataUdf is not and UDF. It is a standard Scala Function4. You have to use udf wrapper

 val  getFormattedMetaDataUdf = udf(getFormattedMetaData _)
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115