18

My goal is to ensure that records published by a DynamoDB stream are processed in the "correct" order. My table contains events for customers. Hash key is Event ID, range key a timestamp. "Correct" order would mean that events for the same customer ID are processed in order. Different customer IDs can be processed in parallel.

I'm consuming the stream via Lambda functions. Consumers are spawned automatically per shard. So if the runtime decides to shard the stream, consumption happens in parallel (if I get this right) and I run the risk of processing a CustomerAddressChanged event before CustomerCreated (for example).

The docs imply that there is no way to influence the sharding. But they don't say so explicitly. Is there a way, e.g., by using a combination of customer ID and timestamp for the range key?

EagleBeak
  • 6,939
  • 8
  • 31
  • 47

3 Answers3

10

The assumption that sharding is determined by table keys seems to be correct. My solution will be to use customer ID as hash key and timestamp (or event ID) as range key.

This AWS blog says:

The relative ordering of a sequence of changes made to a single primary key will be preserved within a shard. Further, a given key will be present in at most one of a set of sibling shards that are active at a given point in time. As a result, your code can simply process the stream records within a shard in order to accurately track changes to an item.

This slide confirms it. I still wish the DynamoDB docs would explicitly say so...

EagleBeak
  • 6,939
  • 8
  • 31
  • 47
5

I just had a response from AWS support. It seems to confirm @EagleBeak assumptions about partitions being mapped into shards. Or as I understand it, a partition is mapped to a shard tree.

My question was about REMOVE events due to TTL expiration, but it would apply to all other types of actions too.

  1. Is a shard created per Primary Partition Key? and then if there are too many items in the same partition, the shard gets split into children?

A shard is created per partition in your DynamoDB table. If a partition split is required due to too many items in the same partition, the shard gets split into children as well. A shard might split in response to high levels of write activity on its parent table, so that applications can process records from multiple shards in parallel.

  1. Will those removed 100 items be put in just one shard provided they all have the same partition key?

Assuming all 100 items have the same partition key value (but different sort key values), they would have been stored on the same partition. Therefore, they would be removed from the same partition and be put in the same shard.

  1. Since "records sent to your AWS Lambda function are strictly serialized", how does this serialisation work in the case of TTL? Is order within a shard established by partition/sort keys, TTL expiration, etc.?

DynamoDB Streams captures a time-ordered sequence of item-level modifications in your DynamoDB table. This time-ordered sequence is preserved at a per shard level. In other words, the order within a shard is established based on the order in which items were created, updated or deleted.

Ruben Bartelink
  • 59,778
  • 26
  • 187
  • 249
cortopy
  • 2,758
  • 2
  • 25
  • 31
  • 2
    It seems to be very careful to not just outright admit that that order is NOT preserved across shards. Shards are processed in parallel, so you can definitely be processing events totally out of order in parallel. It's like "This time-ordered sequence is preserved at a per shard level." Why don't they just say... time-ordered sequences are ONLY preserved at a per shard level, not at the stream level. – Triynko Apr 28 '20 at 19:12
4

A dynamodb stream consists of stream records which are grouped into shards. A shard can spawn child shards in response to high number of writes on the dynamodb table. So you can have parent shards and possibly multiple child shards. To ensure that your application processes the records in the right sequence, the parent shard must always be processed before the child shards. This is described in detail in the docs.

Unfortunately, DynamoDB Streams records sent to AWS Lambda functions are strictly serialized, per shard and ordering of records across different shards is not guaranteed.

From the AWS Lamda FAQs:

Q: How does AWS Lambda process data from Amazon Kinesis streams and Amazon DynamoDB Streams?

The Amazon Kinesis and DynamoDB Streams records sent to your AWS Lambda function are strictly serialized, per shard. This means that if you put two records in the same shard, Lambda guarantees that your Lambda function will be successfully invoked with the first record before it is invoked with the second record. If the invocation for one record times out, is throttled, or encounters any other error, Lambda will retry until it succeeds (or the record reaches its 24-hour expiration) before moving on to the next record. The ordering of records across different shards is not guaranteed, and processing of each shard happens in parallel.

If you use the DynamoDB Streams Kinesis Adapter, your application will process the shards and stream records in the correct order according to the DynamoDB documentation here. For more information on DynamoDB Streams Kinesis Adapter, see Using the DynamoDB Streams Kinesis Adapter to Process Stream Records.

So, using dynamodb lambda trigger won't guarantee ordering. Your other options include using the DynamoDB Streams Kinesis Adapter or the DynamoDB Streams Low-Level API which is a lot more work.

user818510
  • 3,414
  • 26
  • 17
  • 3
    This doesn't answer my question: *how* are records grouped into shards? – EagleBeak May 31 '17 at 06:42
  • 1
    @user818510 how would you use DynamoDB Streams Low-Level API for guaranteed ordering? – titus Jul 23 '20 at 22:32
  • User818510 This suggests the risk you are reading into the wording is not actually present https://stackoverflow.com/a/42187242/11635 @titus the low level API and/or kinesis adapter walks the tree of shards and siblings in such a way as to guarantee correct ordered delivery. Note said API is Java centric (e.g., there is no meaningful .NET impl other than using Lambda) – Ruben Bartelink May 25 '22 at 14:44