0

I'm trying to incorporate a Try().getOrElse() statement in my select statement for a Spark DataFrame. The project I'm working on is going to be applied to multiple environments. However, each environment is a little different in terms of the naming of the raw data for ONLY one field. I do not want to write several different functions to handle each different field. Is there a elegant way to handle exceptions, like this below, in a DataFrame select statement?

val dfFilter = dfRaw
  .select(
   Try($"some.field.nameOption1).getOrElse($"some.field.nameOption2"),
   $"some.field.abc",
   $"some.field.def"
  )

dfFilter.show(33, false)

However, I keep getting the following error, which makes sense because it does not exist in this environments raw data, but I'd expect the getOrElse statement to catch that exception.

org.apache.spark.sql.AnalysisException: No such struct field nameOption1 in...

Is there a good way to handle exceptions in Scala Spark for select statements? Or will I need to code up different functions for each case?

fletchr
  • 646
  • 2
  • 8
  • 25
  • Related to [How do I detect if a Spark DataFrame has a column](https://stackoverflow.com/q/35904136/6910411) – zero323 Oct 13 '18 at 11:13

2 Answers2

1
val selectedColumns = if (dfRaw.columns.contains("some.field.nameOption1")) $"some.field.nameOption2" else $"some.field.nameOption2"

val dfFilter = dfRaw
  .select(selectedColumns, ...)
Fermat's Little Student
  • 5,549
  • 7
  • 49
  • 70
  • That will work! Did not know why I did not think of using a simple if else. Do you know why Try().getOrElse does not work? I'm fairly new to Spark. Thanks. – fletchr Oct 12 '18 at 19:45
  • 1
    @fletchr because $"columnName" is just making a`Column` instance, it doesnt do the SQL catalyst analysis. So the "Try" part will always succeed unless you pass in a `null`. – Fermat's Little Student Oct 12 '18 at 20:05
  • 1
    The analysis is deterred to your select, by that time Try has already finished execution – Fermat's Little Student Oct 12 '18 at 20:06
0

So I'm revisiting this question after a year. I believe this solution to be much more elegant to implement. Please let me know anyone else's thoughts:

// Generate a fake DataFrame
val df = Seq(
    ("1234", "A", "AAA"),
    ("1134", "B", "BBB"),
    ("2353", "C", "CCC")
    ).toDF("id", "name", "nameAlt")

// Extract the column names
val columns = df.columns

// Add a "new" column name that is NOT present in the above DataFrame
val columnsAdd = columns ++ Array("someNewColumn")

// Let's then "try" to select all of the columns
df.select(columnsAdd.flatMap(c => Try(df(c)).toOption): _*).show(false)

// Let's reduce the DF again...should yield the same results
val dfNew = df.select("id", "name")
dfNew.select(columnsAdd.flatMap(c => Try(dfNew(c)).toOption): _*).show(false)

// Results
columns: Array[String] = Array(id, name, nameAlt)
columnsAdd: Array[String] = Array(id, name, nameAlt, someNewColumn)
+----+----+-------+
|id  |name|nameAlt|
+----+----+-------+
|1234|A   |AAA    |
|1134|B   |BBB    |
|2353|C   |CCC    |
+----+----+-------+
dfNew: org.apache.spark.sql.DataFrame = [id: string, name: string]
+----+----+
|id  |name|
+----+----+
|1234|A   |
|1134|B   |
|2353|C   |
+----+----+
fletchr
  • 646
  • 2
  • 8
  • 25