I have a RichCoFlatMapFunction in flink which connects two streams. This class has a constructor which initializes a member of Type1 It's dynamic configuration which we use in this flat map operator's flatmap2 function, to send out final data with some filter flag present in the object. This can be provided dynamically hence we have connected the Type stream in here.
The Configstream data will come in flatMap1 function.
The issue is as below:
- As the job is submitted, the obj is set from constructor.
- After some time the configStream receives some message and the obj gets updated in flatmap1
- Some failure happens in the app (say: Source or Sink side issue)
- Due to which the app operators fail and the job restarts.
- After restart, the last state of obj is not recovered, but the initial state of obj is recovered from the first point when we had it from constructor.
Code looks like below.
public class MyFunction extends RichCoFlatMapFunction<ConfigStream, Tuple2<String, Type2>, OutType> implements Serializable {
MyFunction(ConfigStream obj){
this.obj = obj;
}
private ConfigStream obj;
@Override
public void flatMap1(ConfigStream configStream, Collector<OutType> collector) throws Exception {
this.obj = configStream;
}
@Override
public void flatMap2(Tuple2<String, Type2> tuple, Collector<OutType> collector) throws Exception {
//Some operation from tuple to collector object here.
//For example
if(this.obj.someconfig=="ABC"){
//do this
}else{
//do that
}
collector.collect(something);
}
}