5

We are trying to use dynamic filter for a structured streaming application.

Let's say we have following pseudo-implementation of a Spark structured streaming application:

spark.readStream()
     .format("kafka")
     .option(...)
     ...
     .load()
     .filter(getFilter()) <-- dynamic staff - def filter(conditionExpr: String):
     .writeStream()
     .format("kafka")
     .option(.....)
     .start();

and getFilter returns string

String getFilter() {
   // dynamic staff to create expression
   return expression; // eg. "column = true";
}

Is it possible in current version of Spark to have a dynamic filter condition? I mean the getFilter() method should dynamically return a filter condition (let's say it's refreshed each 10min). We tried to look into broadcast variable but not sure whether structured streaming supports such a thing.

It looks like it's not possible to update job's configuration once it's submitted. As a deploy we use yarn.

Every suggestion/option is highly appreciated.


EDIT: assume getFilter() returns:

(columnA = 1 AND columnB = true) OR customHiveUDF(columnC, 'input') != 'required' OR columnD > 8

after 10 mins we can have small change (without first expression before first OR) and potentially we can have a new expression (columnA = 2) eg:

customHiveUDF(columnC, 'input') != 'required' OR columnD > 10 OR columnA = 2

The goal is to have multiple filters for one spark application and don't submit multiple jobs.

zero323
  • 322,348
  • 103
  • 959
  • 935
VladoDemcak
  • 4,893
  • 4
  • 35
  • 42

2 Answers2

2

Broadcast variable should be ok here. You can write typed filter like:

query.filter(x => x > bv.value).writeStream(...)

where bv is a Broadcast variable. You can update it as described here: How can I update a broadcast variable in spark streaming?

Other solution is to provide i.e. RCP or RESTful endpoint and ask this endpoint every 10 minutes. For example (Java, because is simpler here):

class EndpointProxy {

     Configuration lastValue;
     long lastUpdated
     public static Configuration getConfiguration (){

          if (lastUpdated + refreshRate > System.currentTimeMillis()){
               lastUpdated = System.currentTimeMillis();
               lastValue = askMyAPI();
          }
          return lastValue;
     }
}


query.filter (x => x > EndpointProxy.getConfiguration().getX()).writeStream()

Edit: hacky workaround for user's problem:

You can create 1-row view: // confsDF should be in some driver-side singleton var confsDF = Seq(some content).toDF("someColumn")

and then use:
query.crossJoin(confsDF.as("conf")) // cross join as we have only 1 value 
      .filter("hiveUDF(conf.someColumn)")
      .writeStream()...

 new Thread() {
     confsDF = Seq(some new data).toDF("someColumn)
 }.start();

This hack relies on Spark default execution model - microbatches. In each trigger the query is being rebuilt, so new data should be taken into consideration.

You can also in thread do:

Seq(some new data).toDF("someColumn).createOrReplaceTempView("conf")

and then in query:

.crossJoin(spark.table("conf"))

Both should work. Have in mind that it won't work with Continous Processing Mode

T. Gawęda
  • 15,706
  • 4
  • 46
  • 61
  • Thanks for your reply! Also we use `def filter(conditionExpr: String)` filter -https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1494 with string and conditions. And we would like to use this function and avoid "functional-way" as in your examples. I was thinking to have broadcas `bv.value` which returns String and is periodically updated. If it was not possible to use broadcast we would use your second advice but again string not function way. Do you mind it's possible to do "string-way"? I thought the job is "immutable" after submit. – VladoDemcak Jun 18 '18 at 12:11
  • 1
    @VladoDemcak Spark creates a literal from your input, so it won't be possible to use String here. Literals are immutable – T. Gawęda Jun 18 '18 at 12:12
  • oh.. thanks anyway the info about literals is really important. but that's sad because we use hive's UDFs (as strings) in there. Do you know another "workaround" or something which could be used and achieve the same results as we have right now (to use `filter(String condition)`) and also have dynamic filter? I checked `where()` but it's probably the same. – VladoDemcak Jun 18 '18 at 12:23
  • @VladoDemcak Where creates the same expression internally. How do you use this udf? Maybe this will be possible, what I've put in edit – T. Gawęda Jun 18 '18 at 14:17
  • Thanks for the edit will check it and give it a try in near future. Please check my edit I have added an example how we use and prepare custom filter (string) expression. I have also added detail explanation of our use case we want to achieve. – VladoDemcak Jun 18 '18 at 16:51
  • So based on documentation > // Select the devices which have signal more than 10 > `df.select("device").where("signal > 10")` // using untyped API `ds.filter(_.signal > 10).map(_.device)` // using typed APIs, we can replace 10 with broadcast variable for example but we cannot for untyped API? – VladoDemcak Jun 18 '18 at 17:12
  • 1
    @VladoDemcak It's correct - Spark converts 10 to literal, which is immutable. Lambda is a "black box", cannot be pushed down to source, but also gives oppurtunity to have more dynamic style of filter – T. Gawęda Jun 18 '18 at 17:14
0

Here is the Simple example, In which i am dynamic filtering records which is coming form socket. Instead of Date you can use any rest API which can update your filter dynamically or light weight zookeeper instance.

Note: - If you planning to use any rest API or zookeeper or any other option, use mapPartition instead of filter because in that case you have call API/Connection one time for a partition.

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].filter(_ == new java.util.Date().getMinutes.toString)

// Generate running word count
val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()
Kaushal
  • 3,237
  • 3
  • 29
  • 48
  • 1
    It's ok, it's similar suggestion that I wrote. But OP says that he wants fitler to be a string, not lambda – T. Gawęda Jun 18 '18 at 16:20
  • exactly like @T. Gawęda said. so I cannot give you an upvote for the answer even though it's useful. we dont want to use lambda but string. Basically we need to avoid submitting multiple jobs only b/c small change in the filter. – VladoDemcak Jun 18 '18 at 16:52
  • that's correct but you can generate whatever string you want. it gives you all kind of possibility. you can write a lamda which call the api and get you favorable string. – Kaushal Jun 18 '18 at 17:02
  • sure but its not simple comparing of strings in OP. since the where expression we have is not a lambda expression. when we call an api and get a string this string would be basically where expression which spark’s sqlparser converts into weird staff which is useless b/c as @T.Gawęda said in his answer - it’s immutable. i was thinking about having a transform function or something but nees to play around little bit – VladoDemcak Jun 18 '18 at 19:18