I'm trying to implement a simple CQRS/event sourcing proof of concept on top of Kafka streams (as described in https://www.confluent.io/blog/event-sourcing-using-apache-kafka/)
I have 4 basic parts:
commands
topic, which uses the aggregate ID as the key for sequential processing of commands per aggregateevents
topic, to which every change in aggregate state are published (again, key is the aggregate ID). This topic has a retention policy of "never delete"A KTable to reduce aggregate state and save it to a state store
events topic stream -> group to a Ktable by aggregate ID -> reduce aggregate events to current state -> materialize as a state store
commands processor - commands stream, left joined with aggregate state KTable. For each entry in the resulting stream, use a function
(command, state) => events
to produce resulting events and publish them to theevents
topic
The question is - is there a way to make sure I have the latest version of the aggregate in the state store?
I want to reject a command if violates business rules (for example - a command to modify the entity is not valid if the entity was marked as deleted). But if a DeleteCommand
is published followed by a ModifyCommand
right after it, the delete command will produce the DeletedEvent
, but when the ModifyCommand
is processed, the loaded state from the state store might not reflect that yet and conflicting events will be published.
I don't mind sacrificing command processing throughput, I'd rather get the consistency guarantees (since everything is grouped by the same key and should end up in the same partition)
Hope that was clear :) Any suggestions?