In my code I am trying to come up with a way of having a Map to use as reference for certain operations. To make an example (pseudocode):
JavaDstream miao = create new Dstream;
Map<String,String[]> dictionary = new HashMap<String,String[]>(); //would I need an Hashtable in this case?
miao.foreachRDD( rdd ->{
rdd.foreach(line ->{ //line is a String[]
if (dictionary.containsKey(line[0]){
if(dictionary.get(line[0])[INDEX].equals(line[INDEX])){
append line on csv file;
dictionary.put(line[0],line);
}else{
append line in another file;
}
}else{
dictionary.put(line[0],line);
}
})})
These kind of cases are frequent in my application: check if already processed, do stuff in one case, do stuff in the other, so I need to find a way to do this.
I have read a lot today about broadcast variables and checked
- ways to update them, but it looks a little unfeasable in my case.
- I read that Accumulators are not readable by workers, so even personalized ones are useless for me.
- Finally, I tried to use variables outside the Dstreams, but I get an error and I have to change the variable to final (would this work, I wonder: the Map is final, but the content may change).
If I delegate the Map to another class, a serializable one, and make it static there, will I have something of a streamable collection? For what I have understood, I think not: it will be changed "locally", but the other workers won't receive any update.
EDIT: As promised, although I'm late: What I have to do is this:
- get a string with a numerical dangerParameter which has a range of tolerance
- elaborate that string to create a deviceKey
- the deviceKey is matched in a broadcasted dictionary, and the initial string enriched with the dictionary information
- At this point, the analysis (done with a Map(deviceKey, [counter ; elapsed time]), to weed out single anomalies): If the dangerParameter is out of the boundaries -> send an email
- But if that parameter is indeed outside, the next line it will send will probably bring another triggering value.
- hence, I store it (now in a map, today I will try @maasg's solution) with the deviceKey as key, a timestamp.
- So the next line with the triggering deviceKey will be looked up in the map and if the timestamp is older than an hour. The code:
private static final Map<String, String> alertsAlreadySent = new Hashtable<String, String>(); //MAP done with id and timestamp
public static void sendMail(String whoTo, String[] whoIsDying){ //email address, string enriched with person data
if( (!alertsAlreadySent.containsKey(whoIsDying[CSVExampleDevice.DEVICE_ID])) //if it's not already in the map
|| //OR short circuited
((Long.parseLong(alertsAlreadySent.get(whoIsDying[CSVExampleDevice.DEVICE_ID])) - Long.parseLong(whoIsDying[CSVExampleDevice.TIMESTAMP]))>3600000)
){ // he was already dying, but an hour has already passed, so it may be a new alert
indeedSendMail(whoTo, whoIsDying); //a function to send the mail
alertsAlreadySent.put(whoIsDying[CSVExampleDevice.DEVICE_ID], whoIsDying[CSVExampleDevice.TIMESTAMP]);
//sent the email, we update the timestamp in the map
}
}
I have other such cases around.
Will a Stateful Dstream substitute easily these methods?