6

I'm trying to implement a simple CQRS/event sourcing proof of concept on top of Kafka streams (as described in https://www.confluent.io/blog/event-sourcing-using-apache-kafka/)

I have 4 basic parts:

  1. commands topic, which uses the aggregate ID as the key for sequential processing of commands per aggregate
  2. events topic, to which every change in aggregate state are published (again, key is the aggregate ID). This topic has a retention policy of "never delete"
  3. A KTable to reduce aggregate state and save it to a state store

    events topic stream ->
    group to a Ktable by aggregate ID ->
    reduce aggregate events to current state ->
    materialize as a state store
    
  4. commands processor - commands stream, left joined with aggregate state KTable. For each entry in the resulting stream, use a function (command, state) => events to produce resulting events and publish them to the events topic

The question is - is there a way to make sure I have the latest version of the aggregate in the state store?

I want to reject a command if violates business rules (for example - a command to modify the entity is not valid if the entity was marked as deleted). But if a DeleteCommand is published followed by a ModifyCommand right after it, the delete command will produce the DeletedEvent, but when the ModifyCommand is processed, the loaded state from the state store might not reflect that yet and conflicting events will be published.

I don't mind sacrificing command processing throughput, I'd rather get the consistency guarantees (since everything is grouped by the same key and should end up in the same partition)

Hope that was clear :) Any suggestions?

amitayh
  • 770
  • 1
  • 8
  • 19

3 Answers3

7

I don't think Kafka is good for CQRS and Event sourcing yet, the way you described it, because it lacks a (simple) way of ensuring protection from concurrent writes. This article talks about this in details.

What I mean by the way you described it is the fact that you expect a command to generate zero or more events or to fail with an exception; this is the classical CQRS with Event sourcing. Most of the people expect this kind of Architecture.

You could have Event sourcing however in a different style. Your Command handlers could yield events for every command that is received (i.e. DeleteWasAccepted). Then, an Event handler could eventually handle that Event in an Event sourced way (by rebuilding Aggregate's state from its event stream) and emit other Events (i.e. ItemDeleted or ItemDeletionWasRejected). So, commands are fired-and-forget, sent async, the client does not wait for an immediate response. It waits however for an Event describing the outcome of its command execution.

An important aspect is that the Event handler must process events from the same Aggregate in a serial way (exactly once and in order). This can be implemented using a single Kafka Consumer Group. You can see about this architecture in this video.

Constantin Galbenu
  • 16,951
  • 3
  • 38
  • 54
1

Please read this article by my colleague Jesper. Kafka is a great product but actually not a good fit at all for event sourcing

https://medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c

  • 3
    I read it, and he brings up good points. However, they're not applicable to the design I described which utilizes Kafka streams: Loading current state - is done using the KTable; Consistent writes - are handles by Kafka's partition model and fault tolerance guarantees – amitayh Mar 24 '18 at 06:13
0

A possible solution I came up with is to implement a sort of optimistic locking mechanism:

  1. Add an expectedVersion field on the commands
  2. Use the KTable Aggregator to increase the version of the aggregate snapshot for each handled event
  3. Reject commands if the expectedVersion doesn't match the snapshot's aggregate version

This seems to provide the semantics I'm looking for

amitayh
  • 770
  • 1
  • 8
  • 19
  • 1
    It's nice that you found what you want, but this is not how optimistic locking works. The command should be retried, not rejected. – Constantin Galbenu Mar 22 '18 at 08:43
  • Retrying a command has nothing to do with optimistic locking... It's true that the optimistic locking is not done in the transaction manager of a database, but the observed effect is the same - if the expected version doesn't match the changes will not be persisted. Since Kafka processes messages sequentially per partition, there should not be concurrent writes https://en.wikipedia.org/wiki/Optimistic_concurrency_control – amitayh Mar 24 '18 at 06:10
  • and what see your clients if the command is rejected because of a concurrent write , an error? – Constantin Galbenu Mar 24 '18 at 07:35
  • you fail valid commands because the infrastructure or the architecture do not fit. – Constantin Galbenu Mar 24 '18 at 08:25
  • Defines how you define "valid". If the client is working on a stale version, you can choose to reject the command and ask the client to try again. Or, you can choose to retry the command automatically by re-adding it to the commands topic with the current version. It's actually quite simple – amitayh Mar 24 '18 at 10:33
  • I don't understand how this works: your Aggregate emits events, they are appended to the Events-topic and then the command is rejected/retried? – Constantin Galbenu Mar 24 '18 at 12:48
  • What if another command (maybe by another faster process) appends some events AFTER the version check, and BEFORE appending the events of the current command? – Mohsen Jan 01 '19 at 10:31
  • @Mohsen It's not possible, since commands to a single aggregate will be processed serially, not in parallel. Commands are being pulled off a command queue which is partitioned by aggregate id. – TomW Jul 22 '19 at 12:48
  • I dont understand how you made this work? How did you link the version numbers between command and aggregate. I mean, command comes in, with say v2, aggregate was v1, now bumped to v2, step 3 executes - all good. But how did you determine the expected version of the command? – zooes Feb 27 '20 at 18:24
  • Like zooes said, how did you achieve this? How can you determine an expected version for the command? – tuna Jul 06 '21 at 01:04