2

I want to understand when does IRecordProcessor's processRecords method is invoked from worker. If my earlier call to processRecords is not yet completed will worker call next processRecords ? Will worker start fetching new records from kinesis or will it wait till current records finish execution.

Basically I want to wait for a long time if processRecords gets some exception while saving records in external db since db was down or some other error . So want to confirm there won't be any issue in that if worker does not start fetching new records until earlier are finished processing ?

user1846749
  • 2,165
  • 3
  • 23
  • 36

1 Answers1

1

Excerpt from other questions:

The application (with the help of KCL) will continue to poll "Shard Iterator" in the background, thus you will be notified about the new data when it comes.

Source: https://stackoverflow.com/a/35582161/1622134

And also, by "worker" you mean a "Worker" thread in the application; which is a runnable.

Each shard is processed by exactly one KCL worker and has exactly one corresponding record processor, so you never need multiple instances to process one shard. See the Worker.java class in KCL source.

Source: https://stackoverflow.com/a/34509567/1622134

To answer you question, you can that it in your processRecords implementation. While processing records, use a try-catch block and write checkpoint to DynamoDB if and only if the try part succeeds. That way; if there is an error while writing to external db, you will not lose records and upon restart. You should also save those record data (which cannot be inserted in the db) to another place to process later.

Also see this answer: https://stackoverflow.com/a/32517002/1622134

Community
  • 1
  • 1
az3
  • 3,571
  • 31
  • 31
  • In worker.java , it calls runProcessLoop and in that it calls shardConsumer.consumeShard() there it calls checkAndSubmitNextTask() in that it checks readyForNextTask or not . If notReady it does not consumer new records . So how is it possible worker retrieves new records without recordprocessor process previous ones. – user1846749 Jan 23 '17 at 18:55
  • If there is a temporary db outage on your side (which prevents consuming records); you should stop your Kinesis Consumer Application until that is fixed. Or there is a second approach: in the final link of my answer, there is a line explaning your question: "But if it fails, note it down to another place to investigate the reason why it failed." - So you can process the records consumed during db outage later, manually. – az3 Jan 26 '17 at 10:07