1

I'm doing something like pattern matching in spark streaming app. What I want is updating a variable like broadcast variable, which however is mutable. Is there a way to do that? Any advice?

EDIT

Sorry for not being so clear. I am doing some CEP stuff on logs. I need to load the rules from elasticsearch while the spark application is running. And I wanna apply these rules on the worker side (on each RDD).

hveiga
  • 6,725
  • 7
  • 54
  • 78
louis lau
  • 161
  • 3
  • 11
  • Do you mean a global status at driver level or a local variable at worker level? – axlpado - Agile Lab Nov 17 '15 at 07:36
  • could you add some example code that could help clarify your question? – maasg Nov 17 '15 at 10:24
  • @ axlpado thanks for the reply. I mean a variable at worker level. – louis lau Nov 18 '15 at 01:35
  • @ maasg Sorry for not being clear. I've already update my question.Tks. – louis lau Nov 18 '15 at 01:36
  • Everything that is being sent to workers (including broadcast) from driver is being serialized there once and is readonly, no cross-executors updates. The way the worker could ask for dynamic state is kind of way to the outside of Spark management system - JDBC, or shared filesystem like Tachyon, etc. – Elena Viter Nov 20 '15 at 21:15
  • @ElenaViter Sorry for late reply. Except for Tachyon, how can i achieve this with JDBC,akka or Thread. Can u give me some examples? – louis lau Nov 24 '15 at 01:10

1 Answers1

2

The idea here is to write a wrapper over the broadcast variable that gets refreshed periodically. Catch is to call this function inside transform (or any other variation) which allows RDD-RDD operations.

Code Snipped for the BroadcastWrapper class:

public class BroadcastWrapper {

private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();

private static BroadcastWrapper obj = new BroadcastWrapper();

private BroadcastWrapper(){}

public static BroadcastWrapper getInstance() {
    return obj;
}

public JavaSparkContext getSparkContext(SparkContext sc) {
   JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
   return jsc;
}

public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
    Date currentDate = Calendar.getInstance().getTime();
    long diff = currentDate.getTime()-lastUpdatedAt.getTime();
    if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
        if (var != null)
           var.unpersist();
        lastUpdatedAt = new Date(System.currentTimeMillis());

        //Your logic to refresh
        ReferenceData data = getRefData();

        var = getSparkContext(sparkContext).broadcast(data);
   }
   return var;
 }
}

To use this method we can do something like ->

objectStream.transform(stream -> {
    Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());

 /*Your code here*/

});

Please see my answer on another thread for better clarity https://stackoverflow.com/a/41259333/3166245

Community
  • 1
  • 1
Aastha
  • 493
  • 4
  • 17
  • While this link may answer the question, it is better to include the essential parts of the answer here and provide the link for reference. Link-only answers can become invalid if the linked page changes. - [From Review](/review/low-quality-posts/14661368) – Rajesh Ujade Dec 21 '16 at 12:24
  • New to answering questions .. Edited the post. Thanks @rajeshujade – Aastha Dec 21 '16 at 12:37