5

I have a consumer script which processes each message and commits offsets manually to the topic.

CONSUMER = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=[KAFKA_SERVER],
    auto_offset_reset="earliest",
    max_poll_records=100,
    enable_auto_commit=False,
    group_id=CONSUMER_GROUP,
    # Use the RoundRobinPartition method
    partition_assignment_strategy=[RoundRobinPartitionAssignor],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

while True:
    count += 1
    LOGGER.info("--------------Poll {0}---------".format(count))
    for msg in CONSUMER:
        # Process msg.value
        # Commit offset to topic
        tp = TopicPartition(msg.topic, msg.partition)
        offsets = {tp: OffsetAndMetadata(msg.offset, None)}
        CONSUMER.commit(offsets=offsets)

Time taken to process each message is < 1 sec.

I get this error Error:

kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.


Process finished with exit code 1

Expectation:

a) How to fix this error ?

b) How can I ensure my manually commit is working properly ?

c) Correct way of committing offset.

I have gone through this but Difference between session.timeout.ms and max.poll.interval.ms for Kafka 0.10.0.0 and later versions to understand my problem any help on tuning poll, session or heartbeat time is much appreciated.

Apache kafka: 2.11-2.1.0 kafka-python: 1.4.4

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Shakeel
  • 1,869
  • 15
  • 23

1 Answers1

0

session.timeout.ms of the consumer should be less than the group.max.session.timeout.ms present on Kafka broker.

Selvaram G
  • 727
  • 5
  • 18
  • I couldnd find `group.max.session.timeout.ms` config. I found this in my server.properties `group.initial.rebalance.delay.ms=0` under "Group Coordinator Settings". did you mean this setting on broker side ? – Shakeel Feb 08 '19 at 22:33