We are trying to use dynamic filter for a structured streaming application.
Let's say we have following pseudo-implementation of a Spark structured streaming application:
spark.readStream()
.format("kafka")
.option(...)
...
.load()
.filter(getFilter()) <-- dynamic staff - def filter(conditionExpr: String):
.writeStream()
.format("kafka")
.option(.....)
.start();
and getFilter returns string
String getFilter() {
// dynamic staff to create expression
return expression; // eg. "column = true";
}
Is it possible in current version of Spark to have a dynamic filter condition? I mean the getFilter()
method should dynamically return a filter condition (let's say it's refreshed each 10min). We tried to look into broadcast variable but not sure whether structured streaming supports such a thing.
It looks like it's not possible to update job's configuration once it's submitted. As a deploy we use yarn
.
Every suggestion/option is highly appreciated.
EDIT:
assume getFilter()
returns:
(columnA = 1 AND columnB = true) OR customHiveUDF(columnC, 'input') != 'required' OR columnD > 8
after 10 mins we can have small change (without first expression before first OR) and potentially we can have a new expression (columnA = 2
) eg:
customHiveUDF(columnC, 'input') != 'required' OR columnD > 10 OR columnA = 2
The goal is to have multiple filters for one spark application and don't submit multiple jobs.