I'm trying to implement messaging scenario using apache flink stateful functions.
By design I need to calculate some statistics from incoming messages and store them in the states. After that scenario functions will access these states and messages and run business rules on them. But we may have dozens of scenarios per message and each of them should run exactly once.
the code is more or less as follows
@Override
public void configure(MatchBinder binder) {
binder
.predicate(Transaction.class,this::updateTransactionStatAndSendToScenatioManager)
}
private void updateTransactionStatAndSendToScenatioManager(Context context, Transaction transaction){
// state update
context.send(FnScenarioManager.TYPE, String.valueOf(transaction.id()) , transaction);
}
FnScenarioManager:
@Override
public void configure(MatchBinder binder) {
binder
.predicate(Transaction.class,this::runTransactionScenarios);
}
private void runTransactionScenarios(Context context, Transaction transaction){
context.send(Scenario1.TYPE,String.valueOf(transaction.id()),transaction);
context.send(Scenario2.TYPE,String.valueOf(transaction.id()),transaction);
context.send(Scenario3.TYPE,String.valueOf(transaction.id()),transaction);
...
context.send(ScenarioN.TYPE,String.valueOf(transaction.id()),transaction);
}
My question is what happens if cluster crash in the middle of runTransactionScenarios ?
- Will each scenario run exactly once? if not how can I ensure that?