5

I've been studying DDD for a while, and stumbled into design patterns like CQRS, and Event sourcing (ES). These patterns can be used to help achieving some concepts of DDD with less effort.

Then I started to develop a simple software to implement all these concepts. And started to imagine possible failure paths.

Just to clarify my architecture, the following Image describes one request coming from the front end and reaching the controller I'm the back end (for simplicity I've ignored all filters, binders).

Sequence Diagram

  1. the actor sends a form with the amount of money he wants to withdraw from one account.
  2. the controller passes the view model to the application layer where it will be transformed into one command
  3. the application layer opens one Unit of Work (UOW) map the VM to the command and send the command to the dispatcher.
  4. the dispatcher finds the corresponding aggregate class that know how to handle the command (account) and asks for the factory for the specific instance of the account.
  5. the factory creates a new instance of account and request all the events from the event store.
  6. the event store returns all events of the account.
  7. the factory sends all events to the aggregate, so that it will be with its internal state correct. And return the instance of the account.
  8. the dispatcher sends the command to the account so it can be handled.
  9. the account check if it has enough money to do the withdrawal. And if it has, it sends one new event "MoneyWithdrawnEvent".
  10. this event is handled by the aggregate (account) altering its internal state.
  11. the application layer closes the UOW and when it does, the UOW checks all loaded aggregates to check if they have new events to save to the event store. If there are, it sends the events to the repository.
  12. the repository persists the events to the eventstore.

There are many layers that can be added, for example: caching of aggregates, caching of events, snapshots, etc.

Sometimes ES can be used in parallel to a relational database. So that, when the UOW saves the new events that have happened, it also persists the aggregates to the relational database.

One of the benefits of ES is that it has one central source of truth, the event store. So, even if the models in memory or even in the relational database gets corrupted, we can rebuild the model from the events.

And having this source of truth, we can build other systems that can consume the events in a different way to form a different model.

However, for this to work, we need the source of truth to be clean and not corrupted. Otherwise all these benefits won't exist.

That said, if we consider concurrency in the architecture described in the image, there can be some issues:

  • if the actor sends the form twice to the backend in a sort period, and the back end starts two threads (one for each request), then they will call two times the application layer, and start two UOW, and so on. This can cause two events to be stored in the event store.

This problem can be handled in many different places:

  1. The front End can control which user/actor can do what action and how many times.

  2. The Dispatcher can have one cache of all commands that are being handled, and if there a command that refers to the same aggregate (account) it throws an Exception.

  3. The Repository can Create a new instance of the aggregate and run all events from event store just before saving to check if the version is still the same as the one fetched in step 7.

Problems with each solution:

  1. Front End

    • The user can bypass this constrains by editing some javascript.
    • If there are multiple sessions opened (e.g. different browsers), there would be necessary some static field holding reference to all opened sessions. and it would be necessary to lock some static variable to access this field.
    • If there are multiple Servers for the specific action being performed (horizontal scaling), this static field would not work, because it would be necessary to share this across all servers. So, some layer would be necessary (e.g. Redis).
  2. Command Cache

    • For this solution to work, it would be necessary to lock some static variable of the cache of commands when reading and when writing to it.

    • If there are multiple Servers for the specific use case of the application layer being performed (horizontal scaling), this static cache would not work, because it would be necessary to share this across all servers. So, some layer would be necessary (e.g. Redis).

  3. Repository Version check

    • For this solution to work, it would be necessary to lock some static variable before doing the check (version of Database equals the version fetched in step 7) and saving.

    • If the system was distributed (horizontal scale), it would be necessary to lock the event store. Because, otherwise, both process could pass the check (version of Database equals the version fetched in step 7) and then one saves and then the other saves. And depending on the technology, it is not possible to lock the event store. So, there would be another layer to serialize every access to the event store and add the possibility to lock the store.

This solutions that lock a static variable are somewhat OK, because they are local variables and very fast. However, depending on something like Redis adds some large latencies. And even more if we talk about locking the access to databases (event store). And even more, if this has to be done through one other service.

I would like to know if there is any other possible solution to handle this problem, because this is a major problem (corruption on the event store) and if there is no way around it the whole concept seems to be flawed.

I'm open to any change in the architecture. If for instance, one solution is to add one event Bus, so that everything gets funneled through it, it's fine, but I can't see this solving the problem.

Other point that I'm not familiar with is Kafka. I don’t know if there is some solution that Kafka provides for this problem.

  • 1
    Unrelated: you may want to consider more carefully where your boundaries are. Your `domain model` should know how to take an event history and a `withdraw command` and create new events. That's your functional core -- all of the plumbing should be outside of that. – VoiceOfUnreason Mar 26 '18 at 22:19
  • You may also want to consider if you need steps 1 + 2. Why not just create a command rather than a view model and derive a command. This will reduce the amount and complexity of the code you need to write to get the same testable result. – Codescribler Mar 27 '18 at 08:27
  • Hi @VoiceOfUnreason, In the architecture described in the picture, the Aggregate is the class that handles Commands and Events (and Event history). And the Aggregate is in the Domain Layer. – Rodrigo Riskalla Leal Mar 27 '18 at 13:59
  • Hi @Codescribler, I could remove the Application Layer, so that the controller would be responsable to map the View model into a command. But, I prefered to include it, so the code can be more easily tested. And the controller get's lighter (less code). – Rodrigo Riskalla Leal Mar 27 '18 at 14:02
  • My point is, why map anything? It's extra complexity for no real gain. – Codescribler Mar 27 '18 at 14:03
  • I see @Codescribler . But how would you dispatch the command if you only have the View model that came from the front end to the controller? Would you change, so that the front end sends a command directly to the controller? – Rodrigo Riskalla Leal Mar 27 '18 at 14:25
  • I wouldn't send a view model at all. I often just send the info needed to form the command back to the controller. This hugely simplifies things and reduces the amount of code needed. – Codescribler Mar 27 '18 at 14:49

2 Answers2

6

Although all the solutions that you provided could work in some specific scenarios, I think the last solution (3.2) works for the more general use case. I use it in my open source framework and it works very well.

So, the Event store is the one responsible with ensuring that an Aggregate is not mutated at the same time by two commands.

One way of doing it is to use optimistic locking. When the Aggregate is loaded from the Event store, you remember its version. When you persist the events, you try to append them with the version + 1. You must have an unique index per AggregateType-AggregateId-version. If the append fails you should retry the whole process (load+handle+append).

I think this is the most scalable solution, as it works even with sharding, when the sharding key is a sub set of AggregateId.

You could easily use MongoDB as an EventStore. In MongoDB <= 3.6 you could append all the events atomically by inserting a single document with a nested document containing the array of events.

Another solution is to use pessimistic locking. You start a transaction before you load the Aggregate, append the events, increase its version and commit. You need to use 2 tables/collections, one for the Aggregate metadata+version and one for the actual events. MongoDB >= 4.0 has transactions.

In both these solutions, the Event store does not get corrupted.

Other point that I'm not familiar with is Kafka. I don’t know if there is some solution that Kafka provides for this problem.

You can use Kafka with Event sourcing but you need to change your architecture. See this answer.

Constantin Galbenu
  • 16,951
  • 3
  • 38
  • 54
  • Nice approach. Using one index for “AggregateType-AggregateId-version” may solve the issue. Because if two processes add +1 in the version, the second won’t be able to save. However, If the command given by one of the processes generates two events and the other only one, we would be back to the same problem. For instance, one command is withdraw $100, and the other is Transfer $100 from the account to the same account (I know this is not a real use case). The first adds one entry in the ledger and the second adds two (debt and income). – Rodrigo Riskalla Leal Mar 27 '18 at 14:23
  • @RodrigoRiskallaLeal not quite, it works because you have only one document for all events (one document per commit). – Constantin Galbenu Mar 27 '18 at 14:31
  • @RodrigoRiskallaLeal see here: https://github.com/xprt64/mongolina/blob/master/src/Mongolina/EventsCommit.php – Constantin Galbenu Mar 27 '18 at 14:34
  • @RodrigoRiskallaLeal and this is the event store: https://github.com/xprt64/mongolina/blob/master/src/Mongolina/MongoEventStore.php – Constantin Galbenu Mar 27 '18 at 14:35
  • Yes, if we use a document-based solution like MongoDb for the Event store, we can have one document with all events. But this only hides the problem, because, now, if two concurrent processes tries to write, the first will save and then the second process will overwrite the whole document and save it with it’s own events. For example, one withdraw $10 and the other $200, if the $200 enters the commit after the first, then the event of MoneyWithdrawn $100 will be lost. – Rodrigo Riskalla Leal Mar 27 '18 at 15:02
  • The idea of incrementing +1 not by event but by commit (as in the code you just sent) can be used to mitigate the problem I just described of one process generating one event and the other generating two. However, it would be necessary some changes to the Aggregate, because it calculates it’s version based on the number of events, now we would calculated based on commands… – Rodrigo Riskalla Leal Mar 27 '18 at 15:02
  • @RodrigoRiskallaLeal you don't understand, the operation is always APPEND, how could anything ever get overwrite in an APPEND ONLY system?? – Constantin Galbenu Mar 27 '18 at 15:05
  • @RodrigoRiskallaLeal also, the Aggregate never EVER computes its version, not it knows or even care about something like that, this is purely an Infrastructure concern, not Domain. `Version` MUST be hidden from the Aggregate, which should be a POJO/POPO, no infrastructure whatsoever. – Constantin Galbenu Mar 27 '18 at 15:08
  • Yes, I understand that ES requires Append only, and now that I've looked closer, I've noticed that the method insertOne is not using _id to overwrite. However, in this cenario (concurrency) it's possible that two processes save one EventsCommit each. And there would be two entries on Mongo DB, one with {aggregate type; Aggregate Id ; Withdraw $100, Version: 11} and the other with {aggregate type; Aggregate Id ; Withdraw $200, Version: 11}. So we would have two entries with the same version on the DB. To fix this, we would need to add some uniqueness on the DB or some lock on the code. – Rodrigo Riskalla Leal Mar 28 '18 at 00:20
  • @RodrigoRiskallaLeal that situation is not possible thanks to the unique index `AggregateType-AggregateId-version` – Constantin Galbenu Mar 28 '18 at 03:24
  • @RodrigoRiskallaLeal BTW, it's very good that you question every thing, it's the best way to learn! – Constantin Galbenu Mar 28 '18 at 03:37
3

Short answer: atomic transactions are still a thing.

Longer answer: to handle concurrent writes correctly, you either need a lock, or you need conditional writes (aka compare and swap).

Using a log: we would need to acquire the lock before step 6, and release the lock after step 12.

Using a conditional write: at step 6, the repository would capture a concurrency predicate (which could be implicit -- for instance, the count of events read). When performing the write at step 12, the concurrency predicate would be checked to ensure that there had been no concurrent modifications.

For example, the HTTP API for Event Store uses the ES-ExpectedVersion; the client is responsible for computing (from the events that it has fetched) where it expects the write to occur.

Gabriel Schenker describes both an RDBMS repository and an event store repository in his 2015 essay Event Sourcing applied -- the Repository.

Of course, with the introduction of the conditional write, you should give thought to what you want the model to do when the write fails. You might introduce a retry strategy (go to step 6), or try a merge strategy, or simply fail and return to sender.

In your example of conditional write, I assume that there would be necessary to add a Lock in step 11 (so that it locks the event store to fetch the concurrency predicate). And release the lock only after writing the new events to the Event Store. Otherwise, two concurrent processes could pass the concurrency predicate check and save the events.

Not necessarily.

If your persistence store provides locks, but not conditional writes, then you have the right idea: in step 12, the repository would acquire a lock, check the precondition, commit the new events, and release the lock.

But a persistence appliance that understands conditional writes can implement that check for you. Using event store, the repository doesn't need to acquire a lock. It sends the events with metadata about the expected state to the store. The event store itself uses that information to perform the conditional write.

There's no magic - somebody needs to do the work to ensure that concurrent writes don't clobber each other. But it doesn't necessarily have to be in your code.

Note that I'm using "Repository" as described by Eric Evans in the blue book - it's the abstraction that hides your choice of how to store events from the rest of the system; in other words, it's the adapter that makes your event store look like an in memory collection of events -- it's not the event store itself.

VoiceOfUnreason
  • 52,766
  • 5
  • 49
  • 91
  • In your example of conditional write, I assume that there would be necessary to add a Lock in step 11 (so that it locks the event store to fetch the concurrency predicate). And release the lock only after writing the new events to the Event Store. Otherwise, two concurrent processes could pass the concurrency predicate check and save the events. – Rodrigo Riskalla Leal Mar 27 '18 at 14:10
  • I understand that locking from step 6 through 12 is required if we want the process to behave correcly. Because, this way the second process will try to withdraw the money and possibly won't have enough credit. However, if we just want our eventstore to be correct (not corrupted) then locking only from step 11 to 12 is enough. Correct? – Rodrigo Riskalla Leal Mar 27 '18 at 14:36