2

The problem

I am implementing a microservice as an event-sourcing aggregate which, in turn, is implemented as a Flink FlatMapFunction. In the basic setup, the aggregate reads events and commands from two kafka topics. Then, it writes new events to that first topic and processing results in a third topic. Therefore, Kafka acts as the event store. Hope this drawing helps:

  RPC Request                              RPC Result
  |                                                 |
  ~~~~> Commands-|              |---> Results ~~~~~~|
                 |-->Aggregate--|
  ~> Input evs. -|              |---> output evs. ~~~
  |                                                 |
  ~~~~~<~~~~~~~~~~~<~~~feedbak loop~~~~~<~~~~~~~~<~~~

Due to the fact that Kafka is not checkpoined, commands could potentially be replayed twice and it seems that output events could also be written twice the topic.

How could the state be recovered in those cases with repeated messages? Is it possible for the aggregate to know when its input streams are up-to-date to start processing commands?

My thoughts

I have thought several solutions:

  1. If Flink implements a rollback unconfirmed events, a Sink could be implemented which will get the current offset from the event source. When restarted, this sink would remove newer-than-offset events in kafka topic. It his way, KafkaSource and KafkaSink would be generated from the same builder and then exposed to the topology. This solution has a strong problem given that other services could read the newer events in the topic and cause inconsistency.

  2. If removing events from Flink is not possible in 2, the statefull source could potentially read events from the offset and try to match the repeated events in the aggregate and drop them. This options seems not robust as there can be situations where patches are not deterministic and subject to flaws as it should be rethought for each aggregate and topology and it would not warranty recovery (e.g. in case of consecutive restarts). Therefore this is a bad solution.

  3. A different approach is this one. It is to create a special KafkaSource with two special watermarks: First one, KafkaSourceStartedWatermark, will be always sent at source startup to notify dependant operators. When this watermark is sent, the source internally records the current Kafka offset. Second one, KafkaSourceUpToDateWatermark, is sent by the source when the offset is reached. These watermarks would travel along the topology transparently. The operator should be able to handle these Watermarks, implementing a special WatermarkNotifiable interface.Then, the aggregate will be able to buffer or drop RPC commands until it is up-to-date in every input source.

    interface WatermarkNotifiable  {
        void started(String watermarkId);//KafkaSourceStartedWatermark watermark
        void upToDate(String watermarkId);//KafkaSOurceUpToDateWatermark watermark
    }  
    
  4. If implementing the infrastructure in 3 is not possible, the KafkaSource could implement a constructor specifying a special watermark event that could travel to the operators, but this would require that all the operators depend on these watermarks an re-emits then.

  5. Other different approach is to not process commands older that a criteria. For example, commands have an entry timestamp. If time is used, time synchronization is critical.

Related StackOverflow questions

  1. Using Kafka as a (CQRS) Eventstore. Good idea?
  2. Kafka - Know if Consumer is up to date
  3. Kafka & Flink duplicate messages on restart
Community
  • 1
  • 1
user2108278
  • 391
  • 5
  • 17

1 Answers1

0

Create a new Conmuter operator type. This is like a Source. It is made of several Sources representing Event and Command topics. It starts in "recovering" state. In this state, it reads from the events topics up to their latest. Meanwhile, for the commands, it stores or drops them. Once up to date, it considers recovered and "opens" the way to commands. It could be implemented separately as a source plus an operator.

FlinkKafkaProducerXX is not enough to do this, but it would be the base to implement it.

user2108278
  • 391
  • 5
  • 17