Following the examples given in the AWS Kinesis documentation, I am trying to produce and consume from a Kinesis on-demand stream. What I observed is, when we run the producer, it successfully produces 50 data into the stream. But, reading them sequentially seems like a challenge in the on-demand streams.
On-demand streams create multiple shards as required as per their internal logic. The get-records API needs a shard ID to read from a shard. So, as shown in the example in the documentation, we need to get all the shards in the stream and them read from the shards individually. Even the example code only reads from the last shard.
So, if I want to read the data produced in the same order, how to do it? I modified the GetRecords examples to read from all the shards like below. While every shard individually maintains the order, there seems to be no way to read from the "stream" sequentially, at least, as far as I can learn from the examples.
public static void getStockTrades(KinesisClient kinesisClient, String streamName) {
String lastShardId = null;
// Retrieve the Shards from a Stream
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(streamName)
.build();
List<Shard> shards = new ArrayList<>();
DescribeStreamResponse streamRes;
do {
streamRes = kinesisClient.describeStream(describeStreamRequest);
shards.addAll(streamRes.streamDescription().shards());
if (shards.size() > 0) {
lastShardId = shards.get(shards.size() - 1).shardId();
}
} while (streamRes.streamDescription().hasMoreShards());
shards.forEach(shard -> {
String shardIterator;
String shardId = shard.shardId();
GetShardIteratorRequest itReq = GetShardIteratorRequest.builder()
.streamName(streamName)
.shardIteratorType("TRIM_HORIZON")
.shardId(shardId)
.build();
GetShardIteratorResponse shardIteratorResult = kinesisClient.getShardIterator(itReq);
shardIterator = shardIteratorResult.shardIterator();
// Create new GetRecordsRequest with existing shardIterator.
// Set maximum records to return to 1000.
GetRecordsRequest recordsRequest = GetRecordsRequest.builder()
.shardIterator(shardIterator)
.limit(1000)
.build();
GetRecordsResponse result = kinesisClient.getRecords(recordsRequest);
// Put result into record list. Result may be empty.
List<Record> recordsList = result.records();
System.out.printf("Shared id: %s, Number of records: %d%n", shardId, recordsList.size());
for (Record record : recordsList) {
SdkBytes byteBuffer = record.data();
System.out.printf("Seq No: %s - %s%n", record.sequenceNumber(), new String(byteBuffer.asByteArray()));
}
});
}
I tried to not provide the shard ID and it threw an exception.
If I change from On-Demand to Provisioned and provision with a single shard, I can achieve it, but want to know how to read sequentially in on-demand mode.