7

I am trying to pump lots of data through Amazon Kinesis (order 10,000 points per second).

In order to maximize records per second through my shards, I'd like to round robin my requests over the shards (my application logic doesn't care what shard individual messages go to).

It would seem I could do this with the ExplicitHashKey parameter for the messages in the list I am sending to the PutRecords endpoint - however the Amazon documentation doesn't actually describe how to use ExplicitHashKey, other than the oracular statement of:

http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html

Each record in the Records array may include an optional parameter, ExplicitHashKey, which overrides the partition key to shard mapping. This parameter allows a data producer to determine explicitly the shard where the record is stored. For more information, see Adding Multiple Records with PutRecords in the Amazon Kinesis Streams Developer Guide.

(The statement in the docs above has a link to another section of the documentation, which does not discuss ExplicitHashKeys at all).

Is there a way to use ExplicitHashKey to round robin data among shards?

What are valid values for the parameter?

deadcode
  • 2,226
  • 1
  • 20
  • 29
  • How would `ExplicitHashKey` would minimize number of Partitions? Don't you want to provide a good `partitionKey` for which kinesis will generate MD5 and decide which partition it goes to. Say you have 2 partitions, and you choose good `partitionKey`. Then Kinesis partitioner will send that record to one of those 2 partitions based on 128-bit value. – prayagupa Jun 16 '17 at 18:02
  • 1
    @prayagupd - Assume you want to drive 9,900 data points a second. Since Kinesis supports 1,000 points per second per shard, you should be able to do with with 10 shards. However, if your data distribution is not nearly perfect across the shards, sooner or later one shard will get more than 1,000 points a second. As a practical example I have a system that I'm putting 13,000 points a second through, and using a random number as my partition key, and even with 23 shards I still see rate limit exceptions on rare occasion. If I had perfect round robin distribution I could do with 13. – deadcode Jun 16 '17 at 21:57
  • Gotcha. But how are you thinking of round robin then? I could think of having some sequential number as `partitionKey` so that it uniformly goes to `sequentialPartitionKey / np`. For simplicity, say one partition can write upto 10 events/1000ms. For 130 events/1000ms rate, say sequence numbers are from *1 to 130*, then there would exactly be 10 events `1, 14, 27, 40, 53, 66, 79, 92, 105, 118` going to partition1 no matter in what order they are processed, same with other partitions. You seem on the right track to set your own partition key instead of MD5 hash. – prayagupa Jun 16 '17 at 22:25
  • 2
    Now that I understand things better, for my particular situation I can round robin by sending every Nth record to str( int( (( N%NUM_SHARDS )+0.5) * ( 2**128 / NUM_SHARDS ) ) ). This is because my shards all happen to have evenly sized hash key ranges. You could instead describe the stream, figure out the hash key ranges dynamically, and pick a number in each range, then round robin over those numbers. – deadcode Jun 17 '17 at 01:31
  • Though I did not understand `str( int( (( N%NUM_SHARDS )+0.5) * ( 2**128 / NUM_SHARDS ) ) )` looks interesting. – prayagupa Jun 28 '17 at 21:21

1 Answers1

12

Each shard is assigned a sequential range of 128 bit integers from 0 to 2^128 - 1.

You may find the range of integers assigned to a given shard in a stream via the AWS CLI:

aws kinesis describe-stream --stream-name name-of-your-stream

The output will look like:

{
    "StreamDescription": {
        "RetentionPeriodHours": 24, 
        "StreamStatus": "ACTIVE", 
        "StreamName": "name-of-your-stream", 
        "StreamARN": "arn:aws:kinesis:us-west-2:your-stream-info", 
        "Shards": [
           {
                "ShardId": "shardId-000000000113", 
                "HashKeyRange": {
                    "EndingHashKey": "14794885518301672324494548149207313541", 
                    "StartingHashKey": "0"
                }, 
                "ParentShardId": "shardId-000000000061", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49574208032121771421311268772132530603758174814974510866"
                }
            }, 
           { ... more shards ... }
       ...

You may set the ExplicitHashKey of a record to the string decimal representation of an integer value anywhere in the range of hash keys for a shard to force it to be sent to that particular shard.

Note that due to prior merge and split operations on your shard, there may be many shards with overlapping HashKeyRanges. The currently open shards are the ones that do not have a SequenceNumberRange.EndingSequenceNumber element.

You can round robin requests among a set of shards by identifying an 128 bit integer within the range of each of the shards of interest, and round robin assigning the string representation of that number to each record's ExplicitHashKey.

As a side note, you can also calculate the hash value a given PartitionKey will evaluate to by:

  1. Compute the MD5 sum of the partition key.
  2. Interpret the MD5 sum as a hexadecimal number and convert it to base 10. This will the the hash key for that partition key. You can then look up what shard that hash key falls into.
deadcode
  • 2,226
  • 1
  • 20
  • 29