I have a use case of maintaining configuration in Flink that I don't really know how to handle.
Let's say that I have some configuration stored somewhere and I need it to do my processing. At the initialization of the Flink job, I want to load all the configuration.
This configuration can also be modified during the run of the Flink job, so I must keep in memory the state of this configuration and update it when needed. The updates of configuration are accessible from a KafkaSource.
So here is what I have :
I have a function that load the whole configuration, keep it in a state and associate it with my data stream :
public class MyConfiguration extends RichFlatMapFunction<Row, Row>{
private transient MapState<String, MyConfObject> configuration;
@Override
public void open(MyConfiguration config) throws Exception{
MapStateDescriptor<String,MyConfObject> descriptor = new MapStateDescriptor<String,MyConfObject>(
"configuration",
BasicTypeInfo.STRING_TYPE_INFO,
...
);
configuration = getRuntimeContext().getMapState(descriptor);
configuration.putAll(...); // Load configuration from somewhere
}
@Override
public void flatMap(Row value, Collector<Row> out) throws Exception {
MyConfObject conf = configuration.get(...);
... // Associate conf with data
out.collect(value);
}
}
And my pipeline look like this :
DataStream<Row> dataStream = ...; // My data stream
DataStream<Map<String, MyConfObject> streamConf =
env.addSource(new FlinkKafkaConsumer<Row>(..., ..., ...)) // The stream of configuration updates
.map(...);
return dataStream
.assignTimestampsAndWatermarks(...)
.flatMap(new MyConfiguration())
... //Do some processing
.map(m -> {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(m);
return json.getBytes();
});
What I want is to use the stream of configuration updates streamConf
to update the State variable inside the MyConfiguration
flat map function. How can I do that ?