Suppose I have a Spark DataFrame (in Scala) like
+---+---+---------------+
| a| b| expr|
+---+---+---------------+
| 0| 0|a = 1 AND b = 0|
| 0| 1| a = 0|
| 1| 0|a = 1 AND b = 1|
| 1| 1|a = 1 AND b = 1|
| 1| 1| null|
| 1| 1| a = 0 OR b = 1|
+---+---+---------------+
in which the string column expr
contains nullable Boolean expressions that refer to the other numeric columns in the same DataFrame (a
and b
).
I would like to derive a column eval(expr)
that evaluates the Boolean expression expr
row-wise, i.e.,
+---+---+---------------+----------+
| a| b| expr|eval(expr)|
+---+---+---------------+----------+
| 0| 0|a = 1 AND b = 0| false|
| 0| 1| a = 0| true|
| 1| 0|a = 1 AND b = 1| false|
| 1| 1|a = 1 AND b = 1| true|
| 1| 1| null| true|
| 1| 1| a = 0 OR b = 1| true|
+---+---+---------------+----------+
(in particular, although this is an optional specification, null
evaluates to true
).
Question
What's the best way to create eval(expr)
?
That is, how can I create a column in a Spark DataFrame that evaluates a column of Boolean expressions that refer to other columns in the DataFrame?
I have some not-quite-satisfactory solutions below. They assume the following DataFrame in scope:
val df: DataFrame = Seq(
(0, 0, "a = 1 AND b = 0"),
(0, 1, "a = 0"),
(1, 0, "a = 1 AND b = 1"),
(1, 1, "a = 1 AND b = 1"),
(1, 1, null),
(1, 1, "a = 0 OR b = 1")
).toDF("a", "b", "expr")
Solution 1
Create a large global expression out of the individual expressions:
val exprs: Column = concat(
df.columns
.filter(_ != "expr")
.zipWithIndex
.flatMap {
case (name, i) =>
if (i == 0)
Seq(lit(s"($name = "), col(name))
else
Seq(lit(s" AND $name = "), col(name))
} :+ lit(" AND (") :+ col("expr") :+ lit("))"): _*
)
// exprs: org.apache.spark.sql.Column = concat((a = , a, AND b = , b, AND (, expr, )))
val bigExprString = df.select(exprs).na.drop.as[String].collect.mkString(" OR ")
// bigExprString: String = (a = 0 AND b = 0 AND (a = 1 AND b = 0)) OR (a = 0 AND b = 1 AND (a = 0)) OR (a = 1 AND b = 0 AND (a = 1 AND b = 1)) OR (a = 1 AND b = 1 AND (a = 1 AND b = 1)) OR (a = 1 AND b = 1 AND (a = 0 OR b = 1))
val result: DataFrame = df.withColumn("eval(expr)", expr(bigExprString))
The downside here is the resulting string is very large. In my actual use case, it would be many tens of thousands of characters long, if not longer. I'm not too sure whether this would cause problems.
Solution 2
Separate the DataFrame into multiple based on the value of the expression column, operate on each individually, and recombine into one DataFrame.
val exprs: Seq[String] = df.select("expr").distinct.as[String].collect
// exprs: Seq[String] = WrappedArray(a = 1 AND b = 1, a = 1 AND b = 0, null, a = 0, a = 0 OR b = 1)
val result: DataFrame = exprs.map(e =>
df.filter(col("expr") === e)
.withColumn("eval(expr)", if (e == null) lit(true) else when(expr(e), true).otherwise(false))
).reduce(_.union(_))
.show()
I think the downside of this approach is that it creates many intermediate tables (one for each distinct expression). In my actual use case, this count is potentially hundreds or thousands.