The core application takes 500ms on average to process a record.
I have tried the below patterns. Unfortunately, I couldn't get rid of rebalancing.
Total events: 3,60,000.
Confluent platform: 3Node cluster, each 2TB disk, SSD disk, Better NIC, VM.
Application node details: Has 24 cores, 96GB Memory, While running the application used memory is 20GB(may be other application also consuming), 400% CPU usage(I have 24 cores).
Source topic detail: 1 topic, 10 partitions.
Here, the entities decide the processing speed of an event.
all - at max 100 entities per record(event,kafka message), average processing time for this is 500ms
20 - at max 20 entities per record(event, kafka message), average processing time is lesser
I referred many forums This answer, KIP 62, Confluent docs, @Matthias J.Sax answers, this blog.
I am having hard time to set these values to avoid rebalancing.
Hearbeat expiration logs:
[2021-07-27 07:11:50,775] INFO [GroupCoordinator 3]: Preparing to rebalance group
abc in state PreparingRebalance with old generation 594 (__consumer_offsets-13)
(reason: removing member abc-990854d8-
f8d7-4b77-9318-2542000258d2-StreamThread-1-consumer-badbdf8b-6705-4319-be8f-57a71d2366ef
on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
The moment max.poll.records
reduced to 10, there are no heartbeat expiration. But rebalancing due to metadata change(?)
[2021-07-28 12:09:03,030] INFO [GroupCoordinator 3]: Preparing to rebalance group abc
in state PreparingRebalance with old generation 10 (__consumer_offsets-13)
(reason: Updating metadata for member
abc-e0cb67ba-e587-44f9-844b-746cd498392a-StreamThread-1-consumer-
e875d55d-75b5-4b0a-ad10-cc045223690d during Stable) (kafka.coordinator.group.GroupCoordinator)
The thing which confuses me a lot was heartbeat expiration - no network error, the application didn't crash. Why does this error occur and the application receives the duplicate message at this time[Time matches from the logs(application log, kafka log)]
Another two executions:
Is there any reason for metadata update when none of the thread is down(assuming from the logs, there are no heartbeat expiration)? How do we control this minimum duplicates?