0

I have a dataframe like

scala> testDf.show()
+------+--------+---------+------------+----------------------------------------+
|    id|    item|    value|  value_name|                               condition|
+------+--------+---------+------------+----------------------------------------+
|    11|    3210|        0|         OFF|                                value==0|
|    12|    3210|        1|         OFF|                                value==0|
|    13|    3210|        0|         OFF|                                value==0|
|    14|    3210|        0|         OFF|                                value==0|
|    15|    3210|        1|         OFF|                                value==0|
|    16|    5440|        5|          ON|                     value>0 && value<10|
|    17|    5440|        0|          ON|                     value>0 && value<10|
|    18|    5440|        6|          ON|                     value>0 && value<10|
|    19|    5440|        7|          ON|                     value>0 && value<10|
|    20|    5440|        0|          ON|                     value>0 && value<10|
|    21|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    22|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    23|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    24|    7780|        C|        TYPE|   Set("A","B").contains(value.toString)|
|    25|    7780|        C|        TYPE|   Set("A","B").contains(value.toString)|
+------+--------+---------+------------+----------------------------------------+

scala> testDf.printSchema
root
 |-- id: string (nullable = true)
 |-- item: string (nullable = true)
 |-- value: string (nullable = true)
 |-- value_name: string (nullable = true)
 |-- condition: string (nullable = true)

I want to remove some rows with 'condition' column. But I am in trouble.

I tried with below test code. But it does not seem to work properly.

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.Row
import scala.collection.mutable

val encoder = RowEncoder(testDf.schema);

testDf.flatMap(row => {
  val result = new mutable.MutableList[Row];
  val setting_value = row.getAs[String]("setting_value").toInt
  val condition = row.getAs[String]("condition").toBoolean
  if (condition){
      result+=row;
  };
  result;
})(encoder).show();

And This is error.

19/05/30 02:04:31 ERROR TaskSetManager: Task 0 in stage 267.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 267.0 failed 4 times, most recent failure: Lost task 0.3 in stage 267.0 (TID 3763, .compute.internal, executor 1): java.lang.IllegalArgumentException: For input string: "setting_value==0"
        at scala.collection.immutable.StringLike$class.parseBoolean(StringLike.scala:291)
        at scala.collection.immutable.StringLike$class.toBoolean(StringLike.scala:261)
        at scala.collection.immutable.StringOps.toBoolean(StringOps.scala:29)
        at $anonfun$1.apply(<console>:40)
        at $anonfun$1.apply(<console>:37)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

I want to keep rows that match the value of the condition column. This is the desired result.

+------+--------+---------+------------+----------------------------------------+
|    id|    item|    value|  value_name|                               condition|
+------+--------+---------+------------+----------------------------------------+
|    11|    3210|        0|         OFF|                                value==0|
|    13|    3210|        0|         OFF|                                value==0|
|    14|    3210|        0|         OFF|                                value==0|
|    16|    5440|        5|          ON|                     value>0 && value<10|
|    18|    5440|        6|          ON|                     value>0 && value<10|
|    19|    5440|        7|          ON|                     value>0 && value<10|
|    21|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    22|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    23|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
+------+--------+---------+------------+----------------------------------------+

Please help me if you have a good idea. Thanks.

sproutee
  • 25
  • 6

2 Answers2

1

In the above case, Spark is trying to convert the String value to Boolean. It is not evaluating the expression itself.
And expression evaluation has to be done by the user using an external library or custom code.
The closest(Not the exact scenario though) I could come up is
How to evaluate a math expression given in string form? .

DaRkMaN
  • 1,014
  • 6
  • 9
1

Here is one way using scala reflection API withing a UDF function. The udf handles both cases for int and string values:

import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox

val tb = currentMirror.mkToolBox()

val df = Seq(("0","value==0"),
("1", "value==0"),
("6", """value>0 && value<10"""),
("7", """value>0 && value<10"""),
("0", """value>0 && value<10"""),
("A", """Set("A","B").contains(value.toString)"""),
("C", """Set("A","B").contains(value.toString)""")).toDF("value", "condition")

def isAllDigits(x: String) = x.forall(Character.isDigit)

val evalExpressionUDF = udf((value: String, expr: String) => {
  val result =  isAllDigits(value) match {
    case true => tb.eval(tb.parse(expr.replace("value", s"""${value.toInt}""")))
    case false => tb.eval(tb.parse(expr.replace("value", s""""${value}"""")))
  }

  result.asInstanceOf[Boolean]
})

df.withColumn("eval", evalExpressionUDF($"value", $"condition"))
  .where($"eval" === true)
  .show(false)

Cases for evalExpressionUDF:

  • int: replace expression with the actual int value then execute string code with mkToolBox
  • string: enclose string value into "" then replace expression with the double quoted string and execute the string code

Output:

+-----+-------------------------------------+----+ 
|value|                           condition |eval| 
+-----+-------------------------------------+----+ 
|0    |value==0                             |true| 
|6    |value>0 && value<10                  |true| 
|7    |value>0 && value<10                  |true| 
|A    |Set("A","B").contains(value.toString)|true| 
+-----+-------------------------------------+----+

PS: I know that the performance of the above solution may be bad since it invokes reflection although I am not aware of an alternative.

abiratsis
  • 7,051
  • 3
  • 28
  • 46
  • Thank you very much.Your answer is very helpful. But I get an error below. `scala> testDf.withColumn("eval", evalExpressionUDF($"value", $"condition")).show() ` Error : ` org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableException: scala.tools.reflect.ToolBoxFactory$ToolBoxImpl Serialization stack: - object not serializable (class: scala.tools.reflect.ToolBoxFactory$ToolBoxImpl, value: scala.tools.reflect.ToolBoxFactory$ToolBoxImpl@7341617c) ` – sproutee May 30 '19 at 23:36
  • Hi there, what Spark version are you running? and what environment `spark-shell`,`databricks`? – abiratsis May 30 '19 at 23:46
  • In spark cluster environment of multi node, it seems that error occurs. Is there a way around this? – sproutee May 30 '19 at 23:47
  • I use spark-shell. and my spark version is 2.4.0. Scala version is 2.11.12. – sproutee May 30 '19 at 23:48
  • I just ran it in local mode and I get the same error. – sproutee May 31 '19 at 00:06
  • My testDF is processed several times from the source data. If I declare sample dataframe with `val df = Seq` and execute, error does not occur. – sproutee May 31 '19 at 00:10
  • can you try to add this import `import scala.reflect.runtime.universe._`? – abiratsis May 31 '19 at 00:26
  • so if you run the example as I did above it works but in your case your executing more functionality with that dataframe correct? – abiratsis May 31 '19 at 00:38
  • Yes. I create testDf by joining some of the source data. There is no use other function than join query using 'spark.sql'. – sproutee May 31 '19 at 00:44
  • I am afraid i can t help without being able to reproduce it although it is certainly related to `val tb = currentMirror.mkToolBox()` and the fact that Spark cant serialize the instance of ToolBoxImpl. Please try one more chamge to replace the previous row with `import scala.reflect.runtime.universe val cm = universe.runtimeMirror(getClass.getClassLoader) val tb = cm.mkToolBox()` – abiratsis May 31 '19 at 01:09
  • I used universe.runtimeMirror and I get the same error. I found another article, do I need to override udf Function1? https://stackoverflow.com/questions/38848847/define-spark-udf-by-reflection-on-a-string – sproutee May 31 '19 at 01:16
  • yes that seems reasonable. One solution could be move the class loader inside the udf: `val evalExpressionUDF = udf((value: String, expr: String) => { val cm = universe.runtimeMirror(getClass.getClassLoader) val tb = cm.mkToolBox() val result = isAllDigits(value) match { case true => tb.eval(tb.parse(expr.replace("value", s"""${value.toInt}"""))) case false => tb.eval(tb.parse(expr.replace("value", s""""${value}""""))) } result.asInstanceOf[Boolean] })` – abiratsis May 31 '19 at 01:28
  • After move the class loader , I get same error. But error stack is little different. `org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) ... 51 elided Caused by: java.io.NotSerializableException: scala.reflect.runtime.JavaMirrors$JavaMirror Serialization stack: - object not serializable (class: scala.reflect.runtime.JavaMirrors$JavaMirror, value: JavaMirror with scala.tools.nsc.interpreter.IMain$TranslatingClassLoader@476c137b of type class scala.tools.nsc.interpreter ` – sproutee May 31 '19 at 01:41
  • ok we opened with gates of hell with Spark and reflection together! As you pointed out try to override UDF as shown in the link you sent above https://stackoverflow.com/questions/38848847/define-spark-udf-by-reflection-on-a-string. I will be on vacation and not able to help any further if the problem remains please consider to post another question. Good luck – abiratsis May 31 '19 at 08:23
  • Hello, I have tried to override UDF. But I cant find perfect solution yet. T.T – sproutee Jun 08 '19 at 08:08
  • If you could send that part of code it would be very helpful n order to be synced – abiratsis Jun 08 '19 at 08:25