- I am consuming a Kinesis stream with Spark Streaming 2.2.0 and using spark-streaming-kinesis-asl_2.11.
- Kinesis Stream has 150 shards and I am monitoring
GetRecords.IteratorAgeMilliseconds
CloudWatch metric to see whether consumer is keeping up with the stream. - Kinesis Stream has a default data retention of 86400 seconds (1 day).
- I am debugging a case where a few Kinesis Shards reached maximum
GetRecords.IteratorAgeMilliseconds
of 86400000 (== retention period) - This is only true for some shards (let's call them outdated shards), not all of them.
I have identified shardIds for outdated shards. One of them is shardId-000000000518
and I can see in DynamoDB table that holds checkpointing information the following:
leaseKey: shardId-000000000518
checkpoint: 49578988488125109498392734939028905131283484648820187234
checkpointSubSequenceNumber: 0
leaseCounter: 11058
leaseOwner: 10.0.165.44:52af1b14-3ed0-4b04-90b1-94e4d178ed6e
ownerSwitchesSinceCheckpoint: 37
parentShardId: { "shardId-000000000269" }
I can see the following in the logs of worker on 10.0.165.44:
17/11/22 01:04:14 INFO Worker: Current stream shard assignments: shardId-000000000339, ..., shardId-000000000280, shardId-000000000518
... which should mean that shardId-000000000518 was assigned to this worker. However, I never see anything else in the logs for this shardId. If the worker is not consuming from this shardId (but it should), this can explain why GetRecords.IteratorAgeMilliseconds
never decreases. For some other (non-outdated shardIds), I can see in the logs
17/11/22 01:31:28 INFO SequenceNumberValidator: Validated sequence number 49578988151227751784190049362310810844771023726275728690 with shard id shardId-00000000033
I did verify that outdated shards have data flowing into them by looking at the IncomingRecords CloudWatch metric.
How can I debug/resolve this? Why would these shardIds never get picked up and by the Spark worker?