0

Following the examples given in the AWS Kinesis documentation, I am trying to produce and consume from a Kinesis on-demand stream. What I observed is, when we run the producer, it successfully produces 50 data into the stream. But, reading them sequentially seems like a challenge in the on-demand streams.

On-demand streams create multiple shards as required as per their internal logic. The get-records API needs a shard ID to read from a shard. So, as shown in the example in the documentation, we need to get all the shards in the stream and them read from the shards individually. Even the example code only reads from the last shard.

So, if I want to read the data produced in the same order, how to do it? I modified the GetRecords examples to read from all the shards like below. While every shard individually maintains the order, there seems to be no way to read from the "stream" sequentially, at least, as far as I can learn from the examples.

    public static void getStockTrades(KinesisClient kinesisClient, String streamName) {

    String lastShardId = null;

    // Retrieve the Shards from a Stream
    DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
            .streamName(streamName)
            .build();

    List<Shard> shards = new ArrayList<>();
    DescribeStreamResponse streamRes;
    do {
        streamRes = kinesisClient.describeStream(describeStreamRequest);
        shards.addAll(streamRes.streamDescription().shards());

        if (shards.size() > 0) {
            lastShardId = shards.get(shards.size() - 1).shardId();
        }
    } while (streamRes.streamDescription().hasMoreShards());

    shards.forEach(shard -> {
        String shardIterator;
        String shardId = shard.shardId();
        GetShardIteratorRequest itReq = GetShardIteratorRequest.builder()
                .streamName(streamName)
                .shardIteratorType("TRIM_HORIZON")
                .shardId(shardId)
                .build();


        GetShardIteratorResponse shardIteratorResult = kinesisClient.getShardIterator(itReq);
        shardIterator = shardIteratorResult.shardIterator();

        // Create new GetRecordsRequest with existing shardIterator.
        // Set maximum records to return to 1000.
        GetRecordsRequest recordsRequest = GetRecordsRequest.builder()
                .shardIterator(shardIterator)
                .limit(1000)
                .build();

        GetRecordsResponse result = kinesisClient.getRecords(recordsRequest);

        // Put result into record list. Result may be empty.
        List<Record> recordsList = result.records();
        System.out.printf("Shared id: %s, Number of records: %d%n", shardId, recordsList.size());
        for (Record record : recordsList) {
            SdkBytes byteBuffer = record.data();
            System.out.printf("Seq No: %s - %s%n", record.sequenceNumber(), new String(byteBuffer.asByteArray()));
        }
    });
}

I tried to not provide the shard ID and it threw an exception.

If I change from On-Demand to Provisioned and provision with a single shard, I can achieve it, but want to know how to read sequentially in on-demand mode.

Amudhan
  • 696
  • 8
  • 18

1 Answers1

0

This behavior of out-of-order record reading is expected with multiple shards. AWS Kinesis guarantees payload ordering within a single shard/partition.

The idea with sharding is that multiple consumers can read the records in parallel from each partition. So if we say shard-A is assigned to consumer-A and shard-B is assigned to consumer-B.

Let's assume that the consumers behave the way we expect and read data in order sequentially. This creates a problem. When record-A enters shard-A and record-B enters shard-B, consumer-A starts reading record-A immediately but consumer-B will have to wait for consumer-A to complete it's process because we are enforcing sequential order. Similarly, if consumer-A gets stuck or slows down it will start effecting other consumers as well.

This is a practical scenario of CAP Theorem. We either lose consistency or partitioning with availability constant.

This is a good article which discuss the above challenge in detail. https://brandur.org/kinesis-order

For On Demand Mode

When using the on demand mode the AWS handles the number of shards and leases the shard to the worker in consumer application. Let's assume we only have one consumer-A which is assigned shard-A, shard-B, shard-C. To read the data sequentially the consumer-A has to know the order in which data is living in the shards i.e. is it round-robin A,B,C or C,B,A , or are the records pushed in the producer randomly. This again poses as a bottleneck for high throughput and parallel consumers if we bound them to read data sequentially across shards/partitions. The way a consumer is assigned a shard can be understood by reading the leasing mechanism of AWS Kinesis from here

Also the behaviour of shards getting assigned to consumers depends if consumer is an enhanced fan-out consumer.

Another SO useful thread here And the Kinesis shard calculator here

glory9211
  • 741
  • 7
  • 18
  • Thanks for the answer. I understand what you mean. But my question was mainly regarding the on-demand mode. We have no control over how many shards would be created (or do we?). When it comes to on-demand, we only look at the "stream" and not "shards". How it will be realigned is also unknown. So, how can we allocate consumers on shards when we don't even have the control. In case of provisioned mode we can do it as we have control over the shards. My question was mainly regarding achieving sequential read on the "stream" in on-demand mode. In case of provisioned there is at least 1 way we can – Amudhan May 02 '23 at 13:00
  • You can try to create a single consumer with a single worker and it should read the data sequentially. Also you can checkout the lease table to further see the behavior of your consumers to identify if there is a single shard consumed by a single consumer – glory9211 May 02 '23 at 14:41