0

I need to correct some spellings using spark. Unfortunately a naive approach like

val misspellings3 = misspellings1
    .withColumn("A", when('A === "error1", "replacement1").otherwise('A))
    .withColumn("A", when('A === "error1", "replacement1").otherwise('A))
    .withColumn("B", when(('B === "conditionC") and ('D === condition3), "replacementC").otherwise('B))

does not work with spark How to add new columns based on conditions (without facing JaninoRuntimeException or OutOfMemoryError)?

The simple cases (the first 2 examples) can nicely be handled via

val spellingMistakes = Map(
    "error1" -> "fix1"
  )

  val spellingNameCorrection: (String => String) = (t: String) => {
    titles.get(t) match {
      case Some(tt) => tt // correct spelling
      case None => t // keep original
    }
  }
  val spellingUDF = udf(spellingNameCorrection)

  val misspellings1 = hiddenSeasonalities
    .withColumn("A", spellingUDF('A))

But I am unsure how to handle the more complex / chained conditional replacements in an UDF in a nice & generalizeable manner. If it is only a rather small list of spellings < 50 would you suggest to hard code them within a UDF?

Community
  • 1
  • 1
Georg Heiler
  • 16,916
  • 36
  • 162
  • 292

3 Answers3

0

You can make the UDF receive more than one column:

val spellingCorrection2= udf((x: String, y: String) => if (x=="conditionC" && y=="conditionD") "replacementC" else x)
val misspellings3 = misspellings1.withColumn("B", spellingCorrection2($"B", $"C")

To make this more generalized you can use a map from a tuple of the two conditions to a string same as you did for the first case.

If you want to generalize it even more then you can use dataset mapping. Basically create a case class with the relevant columns and then use as to convert the dataframe to a dataset of the case class. Then use the dataset map and in it use pattern matching on the input data to generate the relevant corrections and convert back to dataframe. This should be easier to write but would have a performance cost.

Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
0

For now I will go with the following which seems to work just fine and is more understandable: https://gist.github.com/rchukh/84ac39310b384abedb89c299b24b9306

Georg Heiler
  • 16,916
  • 36
  • 162
  • 292
0

If spellingMap is the map containing correct spellings, and df is the dataframe.

val df: DataFrame = _ 
val spellingMap = Map.empty[String, String] //fill it up yourself
val columnsWithSpellingMistakes = List("abc", "def")

Write a UDF like this

def spellingCorrectionUDF(spellingMap:Map[String, String]) =
udf[(String), Row]((value: Row) =>
{
  val cellValue = value.getString(0)
  if(spellingMap.contains(cellValue)) spellingMap(cellValue)
  else cellValue
})

And finally, you can call them as

val newColumns = df.columns.map{
case columnName =>
  if(columnsWithSpellingMistakes.contains(columnName)) spellingCorrectionUDF(spellingMap)(Column(columnName)).as(columnName)
  else Column(columnName)
}
df.select(newColumns:_*)
Debasish
  • 113
  • 1
  • 9
  • Indeed, but your function is basically already shown above in my question. For now, I will go with the le linked solution because of https://issues.apache.org/jira/browse/SPARK-18532 – Georg Heiler Nov 22 '16 at 18:39