3

Suppose I have a Spark DataFrame (in Scala) like

+---+---+---------------+
|  a|  b|           expr|
+---+---+---------------+
|  0|  0|a = 1 AND b = 0|
|  0|  1|          a = 0|
|  1|  0|a = 1 AND b = 1|
|  1|  1|a = 1 AND b = 1|
|  1|  1|           null|
|  1|  1| a = 0 OR b = 1|
+---+---+---------------+

in which the string column expr contains nullable Boolean expressions that refer to the other numeric columns in the same DataFrame (a and b).

I would like to derive a column eval(expr) that evaluates the Boolean expression expr row-wise, i.e.,

+---+---+---------------+----------+
|  a|  b|           expr|eval(expr)|
+---+---+---------------+----------+
|  0|  0|a = 1 AND b = 0|     false|
|  0|  1|          a = 0|      true|
|  1|  0|a = 1 AND b = 1|     false|
|  1|  1|a = 1 AND b = 1|      true|
|  1|  1|           null|      true|
|  1|  1| a = 0 OR b = 1|      true|
+---+---+---------------+----------+

(in particular, although this is an optional specification, null evaluates to true).

Question

What's the best way to create eval(expr)?

That is, how can I create a column in a Spark DataFrame that evaluates a column of Boolean expressions that refer to other columns in the DataFrame?


I have some not-quite-satisfactory solutions below. They assume the following DataFrame in scope:

val df: DataFrame = Seq(
  (0, 0, "a = 1 AND b = 0"),
  (0, 1, "a = 0"),
  (1, 0, "a = 1 AND b = 1"),
  (1, 1, "a = 1 AND b = 1"),
  (1, 1, null),
  (1, 1, "a = 0 OR b = 1")
).toDF("a", "b", "expr")

Solution 1

Create a large global expression out of the individual expressions:

val exprs: Column = concat(
  df.columns
    .filter(_ != "expr")
    .zipWithIndex
    .flatMap {
      case (name, i) =>
        if (i == 0)
          Seq(lit(s"($name = "), col(name))
        else
          Seq(lit(s" AND $name = "), col(name))
    } :+ lit(" AND (") :+ col("expr") :+ lit("))"): _*
)
// exprs: org.apache.spark.sql.Column = concat((a = , a,  AND b = , b,  AND (, expr, )))

val bigExprString = df.select(exprs).na.drop.as[String].collect.mkString(" OR ")
// bigExprString: String = (a = 0 AND b = 0 AND (a = 1 AND b = 0)) OR (a = 0 AND b = 1 AND (a = 0)) OR (a = 1 AND b = 0 AND (a = 1 AND b = 1)) OR (a = 1 AND b = 1 AND (a = 1 AND b = 1)) OR (a = 1 AND b = 1 AND (a = 0 OR b = 1))

val result: DataFrame = df.withColumn("eval(expr)", expr(bigExprString))

The downside here is the resulting string is very large. In my actual use case, it would be many tens of thousands of characters long, if not longer. I'm not too sure whether this would cause problems.

Solution 2

Separate the DataFrame into multiple based on the value of the expression column, operate on each individually, and recombine into one DataFrame.

val exprs: Seq[String] = df.select("expr").distinct.as[String].collect
// exprs: Seq[String] = WrappedArray(a = 1 AND b = 1, a = 1 AND b = 0, null, a = 0, a = 0 OR b = 1)

val result: DataFrame = exprs.map(e =>
  df.filter(col("expr") === e)
    .withColumn("eval(expr)", if (e == null) lit(true) else when(expr(e), true).otherwise(false))
).reduce(_.union(_))
.show()

I think the downside of this approach is that it creates many intermediate tables (one for each distinct expression). In my actual use case, this count is potentially hundreds or thousands.

Artem Mavrin
  • 690
  • 9
  • 17

1 Answers1

1

Using this answer the scala.tools.reflect.ToolBox can be used to evaluate the expression after transforming it into a valid Scala expression:

case class Result(a: Integer, b: Integer, expr: String, result: Boolean)

df.mapPartitions(it => {
  import scala.reflect.runtime.universe
  import scala.tools.reflect.ToolBox
  val tb = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()
  val res = it.map(r => {
    val a = r.getInt(0)
    val b = r.getInt(1)
    val expr = r.getString(2)
    val exprResult =
      if ( expr == null) {
        true
      }
      else {
        val scalaExpr = expr.replace("=", "==").replace("AND", "&").replace("OR", "|")
        val scalaExpr2 = s"var a=${a}; var b=${b}; ${scalaExpr}"
        tb.eval(tb.parse(scalaExpr2)).asInstanceOf[Boolean]
      }
    Result(a, b, expr, exprResult)
  })
  res
}).show()

Output:

+---+---+---------------+------+
|  a|  b|           expr|result|
+---+---+---------------+------+
|  0|  0|a = 1 AND b = 0| false|
|  0|  1|          a = 0|  true|
|  1|  0|a = 1 AND b = 1| false|
|  1|  1|a = 1 AND b = 1|  true|
|  1|  1|           null|  true|
|  1|  1| a = 0 OR b = 1|  true|
+---+---+---------------+------+

I am using mapPartitions here instead of a simple udf as the initialization of the the toolbox takes some time. Instead of initializing it once per row it is now initialized only once per partition.

werner
  • 13,518
  • 6
  • 30
  • 45