5

I've been considering to use Apache Kafka as the event store in an event sourcing configuration. The published events will be associated to specific resources, delivered to a topic associated to the resource type and sharded into partitions by resource id. So for instance a creation of a resource of type Folder and id 1 would produce a FolderCreate event that would be delivered to the "folders" topic in a partition given by sharding the id 1 across the total number of partitions in the topic. Even though I don't know how to handle concurrent events that make the log inconsistent.

The simplest scenario would be having two concurrent actions that can invalidate each other such as one to update a folder and one to destroy that same folder. In that case the partition for that topic could end up containing the invalid sequence [FolderDestroy, FolderUpdate]. That situation is often fixed by versioning the events as explained here but Kafka does not support such feature.

What can be done to ensure the consistency of the Kafka log itself in those cases?

Jesuspc
  • 1,664
  • 10
  • 25

1 Answers1

4

I think it's probably possible to use Kafka for event sourcing of aggregates (in the DDD sense), or 'resources'. Some notes:

  1. Serialise writes per partition, using a single process per partition (or partitions) to manage this. Ensure you send messages serially down the same Kafka connection, and use ack=all before reporting success to the command sender, if you can't afford rollbacks. Ensure the producer process keeps track of the current successful event offset/version for each resource, so it can do the optimistic check itself before sending the message.
  2. Since a write failure might be returned even if the write actually succeeded, you need to retry writes and deal with deduplication by including an ID in each event, say, or reinitialize the producer by re-reading (recent messages in) the stream to see whether the write actually worked or not.
  3. Writing multiple events atomically - just publish a composite event containing a list of events.
  4. Lookup by resource id. This can be achieved by reading all events from a partition at startup (or all events from a particular cross-resource snapshot), and storing the current state either in RAM or cached in a DB.

https://issues.apache.org/jira/browse/KAFKA-2260 would solve 1 in a simpler way, but seems to be stalled.

Kafka Streams appears to provide a lot of this for you. For example, 4 is a KTable, which you can have your event producer use one to work out whether an event is valid for the current resource state before sending it.

TomW
  • 3,923
  • 1
  • 23
  • 26
  • If you have only one process per partition, you don't need to wait for acks - all the processing can be serialized inside that process and it doesn't matter that some of the messages are still on the wire, as far as ordering/consistency is concerned (having guaranteed writes is separate issue which should probably not be mixed into explanation at same time) – Artur Biesiadowski Jun 06 '17 at 14:10
  • 1
    But what if resource 1, event write A fails, but then you write event B without waiting for A's ack, and B writes ok? So B is in the stream without A, breaking invariants. Or A is somehow delayed and ends up in the stream after B. Or is this just not possible, since both will use the same TCP connection? Anyway, that simplifies things a bit if it is the case. – TomW Jun 06 '17 at 18:56
  • I don't think it is possible to send events A and then B from single program to single kafka channel and end up with only B there. It will be either nothing, A only, or A and B. Producer connection is lossless TCP stream with reconnects. Of course, if you mess up your kafka broker on purpose (set up message deletion time to few seconds, restart it in middle of processing cleaning the contents etc) everything is possible, but in normal operation (even with errors), events should not get lost from the middle of producer stream. – Artur Biesiadowski Jun 06 '17 at 20:52
  • I'm afraid it is possible and we've seen that happen. Producer sends A, broker rejects it because re-balancing just happened and it's no longer the leader for that partition. Producer discovers the new leader and send B. B is stored but A is not. Only ever use acks=0 if you favour throughput over consistency and are willing to pay the price of message loss on (relatively) normal operation. – Michal Borowiecki Jun 18 '17 at 16:22
  • For #2, I would strongly advise against attempting to read the contents of a stream to ascertain whether a record has been published previously before publishing again, in an attempt to avoid a duplicate. This significantly complicates the producer implementation. The general design philosophy behind Kafka is at-least-once delivery, implying idempotent behaviour in the consumer ecosystem. Your consumers should already be capable of dealing with duplicate events. – Emil Koutanov Jan 16 '21 at 21:44
  • I should update the answer - Kafka now supports idempotent writes, so #2 isn't an issue any more (you can just retry and not get a duplicate if you use the transactional API). – TomW Jan 17 '21 at 22:06