1

I have read a lot of articles and official Kafka documents but could not figure the issue here

I have Kafka consumer code as:

response_consumer = KafkaConsumer(<topic_name>, bootstrap_servers=<server_list>,
                            consumer_timeout_ms = 15000, auto_offset_reset='earliest')
result = []
for message in response_consumer :
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                         message.offset, message.key,
                                         message.value))
    result.append(message.value)
response_consumer.close()

The above code works with auto_offset_reset='earliest' but not auto_offset_reset='latest'. By not working I mean I put a break-point at the for loop and send a message using the producer:

  1. With auto_offset_reset='earliest' I get all the messages along with the most recent message in result
  2. With auto_offset_reset='latest' I do not get any message in result

Read this thread but did not solve the issue: kafka-python consumer not receiving messages (used group_id, does not help)

Any help is appreciated, thank you.

Update: Below code works fine (result does not read all messages from beginning since auto_offset_reset='latest' and result1 only has the most recently produced message):

    response_consumer = KafkaConsumer(<topic_name>, bootstrap_servers=<server_list>, consumer_timeout_ms = 15000, auto_offset_reset='latest')
    result = []
    for message in response_consumer :
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                             message.offset, message.key,
                                             message.value))
        result.append(message.value)

//Send a new message via producer
    result1 = []
    for message in response_consumer :
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                             message.offset, message.key,
                                             message.value))
        result1.append(message.value)
    response_consumer.close()
user_11077035
  • 113
  • 2
  • 9

1 Answers1

0

Simply flush the message after sending it. This will make sure the application sends the message before it gets stopped.

producer = KafkaProducer(
    bootstrap_servers=[constants.KAFKA_HOST],
    value_serializer=self._serialize
)

self.producer.send(topic=TOPIC, value=data)
self.producer.flush()

Consume:

self.consumer = KafkaConsumer(
    self._topic,
    bootstrap_servers=[constants.KAFKA_HOST],
    value_deserializer=self._deserialize,
    auto_offset_reset='latest',
    group_id="main",
    enable_auto_commit=True,
)

for message in self.consumer:
    data = message.value
    print('Received data', data)
Tobias Ernst
  • 4,214
  • 1
  • 32
  • 30