I want to update broadcast variable every minute. So I use the sample code you give by Aastha in this question. how can I update a broadcast variable in Spark streaming?
But it didn't work. The function updateAndGet()
only works when the streaming application start. When I debug my code , it didn't went into the fuction updateAndGet()
twice. So the broadcast variable didn't update every minute.
Why?
Here is my sample code.
public class BroadcastWrapper {
private Broadcast<List<String>> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();
private static BroadcastWrapper obj = new BroadcastWrapper();
private BroadcastWrapper(){}
public static BroadcastWrapper getInstance() {
return obj;
}
public JavaSparkContext getSparkContext(SparkContext sc) {
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
return jsc;
}
public Broadcast<List<String>> updateAndGet(JavaStreamingContext jsc) {
Date currentDate = Calendar.getInstance().getTime();
long diff = currentDate.getTime()-lastUpdatedAt.getTime();
if (broadcastVar == null || diff > 60000) { // Lets say we want to refresh every 1 min =
// 60000 ms
if (broadcastVar != null)
broadcastVar.unpersist();
lastUpdatedAt = new Date(System.currentTimeMillis());
// Your logic to refreshs
// List<String> data = getRefData();
List<String> data = new ArrayList<String>();
data.add("tang");
data.add("xiao");
data.add(String.valueOf(System.currentTimeMillis()));
broadcastVar = jsc.sparkContext().broadcast(data);
}
return broadcastVar;}}
//Here is the computing code submit to spark streaming.
lines.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
Broadcast<List<String>> blacklist =
BroadcastWrapper.getInstance().updateAndGet(jsc);
@Override
public JavaRDD<String> call(JavaRDD<String> rdd) {
JavaRDD<String> dd=rdd.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String word) {
if (blacklist.getValue().contains(word)) {
return false;
} else {
return true;
}
}
});
return dd;
}});