-3

I have two dataframes like below,Im reading logic DF from MySQL Table

Logic DF:

slNo | filterCondtion |
-----------------------
1    | age > 100      |
2    | age > 50       |
3    | age > 10       |
4    | age > 20       |

InputDF - reading from File:

age   | name           |
------------------------
11    | suraj          |
22    | surjeth        |
33    | sam            |
43    | ram            |

I want to apply a filter statement from logic data frame and add count of those filter

result output:

slNo | filterCondtion | count |
------------------------------
1    | age > 100      |   10  |
2    | age > 50       |   2   |
3    | age > 10       |   5   |
4    | age > 20       |   6   |
-------------------------------

code which I have tried:

val LogicDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/testDB").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "logic_table").option("user", "root").option("password", "password").load()

def filterCount(str: String): Long ={
     val counte = inputDF.where(str).count()
counte
}

val filterCountUDF = udf[Long, String](filterCount)

LogicDF.withColumn("count",filterCountUDF(col("filterCondtion")))

Error trace:

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string) => bigint)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:121)
  at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
  at org.apache.spark.sql.Dataset.where(Dataset.scala:1525)
  at filterCount(<console>:28)
  at $anonfun$1.apply(<console>:25)
  at $anonfun$1.apply(<console>:25)
  ... 21 more

any alternative is also fine ..! thanks in advance.

Learnis
  • 526
  • 5
  • 25
  • are you sure that `inputDF` is not null? – UninformedUser Apr 26 '20 at 13:17
  • yes. InputDF is not null. It is working, if create a DF like this ```val logicDF = Seq( (1, "age > 18"), (2, "age < 18") ).toDF("slno", "filterCondtion")``` but when apply something on DF read from MySQL..It is causing this issue. – Learnis Apr 26 '20 at 13:20
  • hm, not sure if you can use a another dataframe inside an UDF. see also https://stackoverflow.com/questions/50123238/pyspark-use-dataframe-inside-udf and https://stackoverflow.com/questions/41390572/how-to-reference-a-dataframe-when-in-an-udf-on-another-dataframe – UninformedUser Apr 26 '20 at 13:26
  • Ok, is there any way to solve this problem ? – Learnis Apr 26 '20 at 13:38
  • please, show schema DataFrames after read data with MySQL. – mvasyliv Apr 26 '20 at 14:02
  • ```root |-- slNo: string (nullable = true) |-- filterCondition: string (nullable = true)``` – Learnis Apr 26 '20 at 14:11
  • and schema to InputDF, pls? – mvasyliv Apr 26 '20 at 14:15
  • inputDF, im reading it from the file. ``` root |-- age: integer (nullable = false) |-- name: string (nullable = true) ``` – Learnis Apr 26 '20 at 14:17
  • Both schema and Dataframe are same. – Learnis Apr 26 '20 at 14:19
  • no. LogicDF.withColumn("count",filterCountUDF(col("filterCondtion"))) in your code -- filterCondition: string (nullable = true - in your schema filterCondtion <> filterCondition, diff = "i" please, try my code. – mvasyliv Apr 26 '20 at 14:43
  • .filter('filterCondition.like("%age%")) - please, try use. I edit my code – mvasyliv Apr 26 '20 at 15:25

2 Answers2

0

Solution without UDFs

This will work as long as your logicDF is small enough to be collected into the driver.

Step 1

Collect your logic into an Array[(Int, String)], as:

val rules = logicDF.collect().map{ r: Row =>
  val slNo = r.getAs[Int](0)
  val condition = r.getAs[String](1)
  (slNo, condition)
}

Step 2

Build a new column with conditional values chaining those rules into a when Column. To do this, use some scala looping like:

val unused = when(lit(false), lit(false))
val filters: Column = rules.foldLeft(unused){
  case (acc: Column, (slNo: Int, cond: String)) =>
    acc.when(col("slNo") === slNo, expr(cond))
}

//You will get something like:
//when(col("slNo") === 1, expr("age > 10"))
//.when(col("slNo") === 2, expr("age > 20"))
//...

Step 3

Get the cartesian product of both DataFrames with a join, so you can apply every rule to every row in your data:

val joinDF = logicDF.join(inputDF, lit(true), "inner") //inner or whatever

Step 4

Filter using the previous Column with conditional filters.

val withRulesDF = joinDF.filter(filters)

Step 5

Group and count:

val resultDF = withRulesDF
  .groupBy("slNo", "filterCondtion")
  .agg(count("*") as "count")
Civyshk
  • 194
  • 1
  • 7
-3
package spark

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object LogicFilterDataFrame extends App {
  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  case class LogicFilter(slNo: Int, filterCondition: String)
  case class Data(age: Int, name:String)

  val logicDF = Seq(
    LogicFilter(1, "age > 100"),
    LogicFilter(2, "age > 50"),
    LogicFilter(3, "age > 10"),
    LogicFilter(4, "age > 20")
  ).toDF()

  val dataDF = Seq(
    Data(11, "suraj"),
    Data(22, "surjeth"),
    Data(33, "sam"),
    Data(43, "ram")
  ).toDF()

  val logicCount = udf{s: String => {
    dataDF.filter(s).count()
    }}
  val resDF = logicDF.filter('filterCondition.like("%age%")).withColumn("count", logicCount('filterCondition))
  resDF.show(false)

}
mvasyliv
  • 1,214
  • 6
  • 10
  • Thanks. I have tried creating the DF inside the code using implicit ```val logicDF = Seq( (1, "age > 18"), (2, "age < 18") ).toDF("slno", "filterCondtion")``` It worked for mee too. but I'm getting the issue, when read the logicDF from MySQL DB.. – Learnis Apr 26 '20 at 14:00
  • LogicDF.withColumn("count",filterCountUDF(col("filterCondtion"))) in your code -- filterCondition: string (nullable = true - in your schema – mvasyliv Apr 26 '20 at 14:40
  • I have tried your code as well, It is working if i create a logicDF with implicit's, but if i try to read from MySQL, I'm getting that null pointer exception. FYI even in your code also filterCondition is string and its nullable true – Learnis Apr 26 '20 at 15:03
  • also, we need checking data with MySQL in field filterCondition. – mvasyliv Apr 26 '20 at 15:14