I'm investigating Spark Streaming as a solution for an anti-fraud service I am building, but I am struggling to figure out exactly how to apply it to my use case. The use case is: data from a user session is streamed, and a risk score is calculated for a given user, after 10 seconds of data is collected for that user. I am planning on using a batch interval time of 2 seconds, but need to use data from the full 10 second window. At first, updateStateByKey() seemed to be the perfect solution, as I could build up a UserRisk object using the events the system collects. The trouble is, I am not sure how to tell Spark to stop updating a user after the 10 seconds have passed, as at the 10 second mark, I run our inference engine against the UserRisk object, and persist the result. The other approach is the window transformation. The issue with the window transformation is that I have to dedup data manually, which might be wasteful. Any suggestions on how to tell updateStateByKey to stop reducing on a certain key after an interval of time has passed?
2 Answers
If you don't want use windowing, you can reduce batch interval to 1s and then in updateStateByKey update also an incremental counter and bypass the update function when it reachs 10.
myDstreamByKey.updateStateByKey( (newValues: Seq[Row], runningState: Option[(UserRisk, Int)]) => {
if(runningState.get._2 < 10){
Some(( updateUserRisk(runningState.get._1, newValues), runningState.get._2 + 1) )
}else{
runningState
}
} )
For semplicity I'm considering the State always a Some, but you have to handle it according your business logic, that I don't know.
In my example Row is as a fake case class that represents your original data and UserRisk is the accumulating state. the updateUserRisk function contains your business logic to update the UserRisk

- 355
- 3
- 10
According to your case, you can try reduceByKeyAndWindow Dstream function, It will fulfill your requirement
Here is sample code in java
JavaPairDStream<String, Integer> counts = pairs.reduceByKeyAndWindow(
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}, new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 - i2;
}
}, new Duration(60 * 1000), new Duration(2 * 1000));
Some important links
http://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations