-2

In my code I am trying to come up with a way of having a Map to use as reference for certain operations. To make an example (pseudocode):

JavaDstream miao = create new Dstream;
Map<String,String[]> dictionary = new HashMap<String,String[]>(); //would I need an Hashtable in this case? 
miao.foreachRDD( rdd ->{
    rdd.foreach(line ->{ //line is a String[]
        if (dictionary.containsKey(line[0]){
            if(dictionary.get(line[0])[INDEX].equals(line[INDEX])){
                append line on csv file;
                dictionary.put(line[0],line);
            }else{
                append line in another file;
            }
        }else{
           dictionary.put(line[0],line);
        }
    })})

These kind of cases are frequent in my application: check if already processed, do stuff in one case, do stuff in the other, so I need to find a way to do this.
I have read a lot today about broadcast variables and checked

  • ways to update them, but it looks a little unfeasable in my case.
  • I read that Accumulators are not readable by workers, so even personalized ones are useless for me.
  • Finally, I tried to use variables outside the Dstreams, but I get an error and I have to change the variable to final (would this work, I wonder: the Map is final, but the content may change).

If I delegate the Map to another class, a serializable one, and make it static there, will I have something of a streamable collection? For what I have understood, I think not: it will be changed "locally", but the other workers won't receive any update.

EDIT: As promised, although I'm late: What I have to do is this:

  • get a string with a numerical dangerParameter which has a range of tolerance
  • elaborate that string to create a deviceKey
  • the deviceKey is matched in a broadcasted dictionary, and the initial string enriched with the dictionary information
  • At this point, the analysis (done with a Map(deviceKey, [counter ; elapsed time]), to weed out single anomalies): If the dangerParameter is out of the boundaries -> send an email
  • But if that parameter is indeed outside, the next line it will send will probably bring another triggering value.
  • hence, I store it (now in a map, today I will try @maasg's solution) with the deviceKey as key, a timestamp.
  • So the next line with the triggering deviceKey will be looked up in the map and if the timestamp is older than an hour. The code:

private static final Map<String, String> alertsAlreadySent = new Hashtable<String, String>(); //MAP done with id and timestamp

    public static void sendMail(String whoTo, String[] whoIsDying){ //email address, string enriched with person data
        if( (!alertsAlreadySent.containsKey(whoIsDying[CSVExampleDevice.DEVICE_ID])) //if it's not already in the map 
                || //OR short circuited
             ((Long.parseLong(alertsAlreadySent.get(whoIsDying[CSVExampleDevice.DEVICE_ID])) - Long.parseLong(whoIsDying[CSVExampleDevice.TIMESTAMP]))>3600000)  
             ){ // he was already dying, but an hour has already passed, so it may be a new alert
            indeedSendMail(whoTo, whoIsDying); //a function to send the mail
            alertsAlreadySent.put(whoIsDying[CSVExampleDevice.DEVICE_ID], whoIsDying[CSVExampleDevice.TIMESTAMP]);
            //sent the email, we update the timestamp in the map
        }
    }

I have other such cases around.
Will a Stateful Dstream substitute easily these methods?

Community
  • 1
  • 1
Vale
  • 1,104
  • 1
  • 10
  • 29
  • Have you thought about using distributed cache for that purpose? – Taras Matyashovskyy Jun 08 '16 at 20:11
  • Like? You mean a cached PairRDD? – Vale Jun 08 '16 at 20:36
  • Could you add some sample data to make this more clear? Also, I don't understand why the condition is "when the value already known is equal to the new value, we put it in the Map" (second `if` clause). Shouldn't this be the other way around? i.e. when we get a new value, we store that value? – maasg Jun 09 '16 at 11:25
  • Indeed, it was the other way around. I will edit the question within today, as soon as I'm over with another job. Thank you @maasg this is the second one you may be answering me. – Vale Jun 09 '16 at 12:36

1 Answers1

3

I think that the intention here is to preserve some state as the data stream is being processed. At the same time, we want to classify data in the stream in the following way:

  • the key is unknown => becomes known
  • the key is known, but its value does not match our state => add record to file "A", remember the new value
  • the key is known, and the value is equal to the value we know => add record to file "B"

Q: Could we use use a Java Map readable and writable by workers?

NO

In a Spark Streaming, seen as a distributed system, we cannot use mutable collection across executors and expect that changes to that structure gets propagated. Those objects live in the JVM that creates them and all changes remain local to the JVM. There're ways to to that (e.g. CRDT) but that would require additional messaging infrastructure between executors. Another option could be a centralized storage, like a distributed cache or a database.

Q: Can we do this some other way?

YES

Spark Streaming supports stateful transformations that allow us to model such process. We do need to change the approach in order to make this work. So, instead of verifying a condition and taking an action, like the original question aims to do, we are going to label entries, build our state and the group the results to optimize I/O operations. (I'm going to use Scala. The Java approach is very much the same API, bearing the added verbosity and the lack of pattern matching capabilities)

val dstream = ??? // my dstream here

// We define a state update function that implements our business logic in dealing with changing values

    def stateUpdateFunction(
        key: String,
        value: Option[Array[String]], 
        state: State[String]): Option[(String,String, Array[String])] = {

    val stateValue = state.getOption()    // Get current value for the given key 
    val label = (stateValue, value) match {
      case (None, Some(newValue)) =>       // new key!
        state.update(newValue(0))          // Update session data
        "NEW_KEY"                          // this is the resulting label for this state
      case (Some(oldValue), Some(newValue))  if (oldValue == newValue(0))  =>  // we know the key. The value is the same. In this case we don't update the state
        "SAME_VALUE"
      case (Some(oldValue), Some(newValue)) if (oldValue != newValue(0))  =>  // the value is not the same, so we store the new value  
        state.update(newValue(0)) 
        "NEW_VALUE"
      case (None, None) =>           "NOP"                 // do nothing
      case (Some(oldValue), None) => "NOP"        // do nothing

      }
    value.map(v => (label, key, v)) // the computed a label for this key and the given value  
    }

val stateSpec = StateSpec.function(stateUpdateFunction _)

// transform the original stream into a key/value dsteam that preserves the original data in the value
val keyedDstream = dstream.map(elem => (elem(0), elem)) 

// Add labels to the data using a stateful transformation
val mappedDstream = dstream.mapWithState(stateSpec)

// remove the "None" in the stream
val labeledDataDStream = mappedDstream.filter(entry => entry != None) // or flatMap(identity)

// Now, labeledDataDStream contains our data labeled, we can proceed to filter it out in the different required subsets                           

val changedValuesDStream = labeledData.filter{case (label, key, value) => label == "NEW_VALUE"}
val sameValuesDStream = labeledData.filter{case (label, key, value) => label == "SAME_VALUE"}
val newKeyDStream = labeledData.filter{case (label, key, value) => label == "NEW_KEY"}

// we can save those datasets to disk (or store in a db, ...)

changedValuesDStream.saveAsTextFiles("...")
sameValueDStream.saveAsTextFiles("..."
maasg
  • 37,100
  • 11
  • 88
  • 115
  • THank you. I will try out the statefulDstream today, meanwhile I updated my question to better explain what my current task is – Vale Jun 10 '16 at 08:54