1

The core application takes 500ms on average to process a record.

I have tried the below patterns. Unfortunately, I couldn't get rid of rebalancing.

Total events: 3,60,000.

Confluent platform: 3Node cluster, each 2TB disk, SSD disk, Better NIC, VM.

Application node details: Has 24 cores, 96GB Memory, While running the application used memory is 20GB(may be other application also consuming), 400% CPU usage(I have 24 cores).

Source topic detail: 1 topic, 10 partitions.

Processing details

Here, the entities decide the processing speed of an event.

all - at max 100 entities per record(event,kafka message), average processing time for this is 500ms

20 - at max 20 entities per record(event, kafka message), average processing time is lesser

I referred many forums This answer, KIP 62, Confluent docs, @Matthias J.Sax answers, this blog.

I am having hard time to set these values to avoid rebalancing.

Hearbeat expiration logs:

[2021-07-27 07:11:50,775] INFO [GroupCoordinator 3]: Preparing to rebalance group 
abc in state PreparingRebalance with old generation 594 (__consumer_offsets-13) 
(reason: removing member abc-990854d8- 
f8d7-4b77-9318-2542000258d2-StreamThread-1-consumer-badbdf8b-6705-4319-be8f-57a71d2366ef 
on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)

The moment max.poll.records reduced to 10, there are no heartbeat expiration. But rebalancing due to metadata change(?)

[2021-07-28 12:09:03,030] INFO [GroupCoordinator 3]: Preparing to rebalance group abc 
in state PreparingRebalance with old generation 10 (__consumer_offsets-13) 
(reason: Updating metadata for member 
abc-e0cb67ba-e587-44f9-844b-746cd498392a-StreamThread-1-consumer- 
e875d55d-75b5-4b0a-ad10-cc045223690d during Stable) (kafka.coordinator.group.GroupCoordinator)

The thing which confuses me a lot was heartbeat expiration - no network error, the application didn't crash. Why does this error occur and the application receives the duplicate message at this time[Time matches from the logs(application log, kafka log)]

Another two executions:

Latest two executions

Is there any reason for metadata update when none of the thread is down(assuming from the logs, there are no heartbeat expiration)? How do we control this minimum duplicates?

Gibbs
  • 21,904
  • 13
  • 74
  • 138

0 Answers0