This is the scenario: Using AWS Kinesis > To get records from Kinesis I need a shard iterator > Kinesis won't return a new shard iterator until a request is complete > The call to "getRecords" is asynchronous > Attempt to iterate this process fails because the request hasn't resolved before the shard iterator is needed
const getRecordsInShard = (shard, batchSize, streamName) => {
const records = [];
const loop = (response) => {
if (_.isEmpty(response.NextShardIterator)) {
return Promise.resolve(records);
}
records.push(response.Records);
return getRecordsByShardIterator(response.NextShardIterator, batchSize).then(loop);
};
return getStreamIterator(shard.ShardId, shard.SequenceNumberRange.StartingSequenceNumber, (streamName || process.env.KINESIS_STREAM))
.then(response => getRecordsByShardIterator(response.ShardIterator, batchSize))
.then(loop);
};
The above code fails because the promise returned from loop
doesn't resolve before the return of the super function. How can I iterate a promise function sequentially, using the return value from the previous iteration as the next's input?
Caveats: Each iteration relies on the information returned by the previous, the result needs to collect records from each iteration altogether