2

I'm trying to implement messaging scenario using apache flink stateful functions.

By design I need to calculate some statistics from incoming messages and store them in the states. After that scenario functions will access these states and messages and run business rules on them. But we may have dozens of scenarios per message and each of them should run exactly once.

the code is more or less as follows

@Override
    public void configure(MatchBinder binder) {
        binder
            .predicate(Transaction.class,this::updateTransactionStatAndSendToScenatioManager)
}

    private void updateTransactionStatAndSendToScenatioManager(Context context, Transaction transaction){
        // state update
        context.send(FnScenarioManager.TYPE,  String.valueOf(transaction.id()) , transaction);
    }

FnScenarioManager:

@Override
    public void configure(MatchBinder binder) {
    binder
        .predicate(Transaction.class,this::runTransactionScenarios);
}


private void runTransactionScenarios(Context context, Transaction transaction){
   context.send(Scenario1.TYPE,String.valueOf(transaction.id()),transaction);
   context.send(Scenario2.TYPE,String.valueOf(transaction.id()),transaction);
   context.send(Scenario3.TYPE,String.valueOf(transaction.id()),transaction);
   ...
   context.send(ScenarioN.TYPE,String.valueOf(transaction.id()),transaction);
}

My question is what happens if cluster crash in the middle of runTransactionScenarios ?

  • Will each scenario run exactly once? if not how can I ensure that?

1 Answers1

1

Stateful Functions (and Apache Flink in general) supports exactly-once state semantics. What this means is that in the case of failure, the runtime will consistently roll back both state and messages in such a way as to simulate completely failure-free execution.

This means messages may be replayed but the internal state will be rolled back to the point in time before the message was received. So long as your business rules only modify statefun state and interact with the outside world through an egress, you can treat the system as having exactly once properties.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
Seth
  • 345
  • 1
  • 4
  • What happens if Scenario1 update internal state then cluster fails before other scenario's update their internal state? If we use state as a counter it will be updated twice am I right? – Arif Ezberci Apr 30 '20 at 14:11
  • No, in that case scenario 1 will roll back its state to the value before the message and then reprocess it resulting in the same final state. While it will technically process the message twice, it will not result in incorrect results. Imagine your just doing counting, the current count is 5 and a message increments the count to 6. If a failure occurs the count will reset to 5 and then the message will be reprocessed resulting in the same correct result. – Seth Apr 30 '20 at 14:55
  • 1
    Let's assume we have 5 counting scenarios working in order as my original question. Scenario 1,2,3 incremented their state 4,5 did not. How does flink calculate the right scenario state for every scenario in case of failure. Is there a documentation about how states behave in case of failure? – Arif Ezberci Apr 30 '20 at 17:08