I am trying to build a simple application that reads data from AWS Kinesis. I have managed to read data using a single shard but I want to get data from 4 different shards.
Problem is, I have a while loop which iterates as long as the shard is active which prevents me from reading data from different shards. So far I couldn't find an alternative algorithm nor was able to implement a KCL-based solution. Many thanks in advance
public static void DoSomething() {
AmazonKinesisClient client = new AmazonKinesisClient();
//noinspection deprecation
client.setEndpoint(endpoint, serviceName, regionId);
/** get shards from the stream using describe stream method*/
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(streamName);
List<Shard> shards = new ArrayList<>();
String exclusiveStartShardId = null;
do {
describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest);
shards.addAll(describeStreamResult.getStreamDescription().getShards());
if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
} else {
exclusiveStartShardId = null;
}
}while (exclusiveStartShardId != null);
/** shards obtained */
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setShardId(shards.get(0).getShardId());
getShardIteratorRequest.setShardIteratorType("LATEST");
GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
while (!shardIterator.equals(null)) {
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(250);
GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
shardIterator = getRecordsResult.getNextShardIterator();
if(records.size()!=0) {
for(Record r : records) {
System.out.println(r.getPartitionKey());
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}