1

I am using Kafka python version 2.0.2 to Produce and consume messages: My producer :

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='my server')
producer.send('my topic', b'hello word')

my consumer :

consumer = kafka.KafkaConsumer('my topic',bootstrap_servers=['192.168.2.137:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='name')
    
for message in consumer:    
    print(message.value)

when I run the consumer it works just fine. But when I stop it before it finishes all messages it don't continue from where I stopped, What if my program crashes or my laptop out of battery? how can I solve this each I want the consumer to continue read un readed messages?

matjar trk
  • 77
  • 1
  • 5
  • can you test this again but by adding some delay on the loop for 5-6 seconds? I assume this is default behaviour of the client (it waits for around 5 secs before commiting) – kucing_terbang Dec 03 '20 at 14:03
  • A delay will not help if the client crashed before the delay ended. – Michael Heil Dec 03 '20 at 14:06
  • I found that if I crashed the consumer and run it again it waits for 10 minutes and then it starts to read from the begging. Not from where it stopped. – matjar trk Dec 03 '20 at 14:09
  • @matjartrk, when it crashed, did it happen after the delay or before the delay? for the 10 minutes wait. I guess you need to check the timeout/interval options. – kucing_terbang Dec 04 '20 at 04:29

1 Answers1

0

Try to change the offset reset config to the latest(this is the default).

consumer = kafka.KafkaConsumer('my topic',bootstrap_servers=['192.168.2.137:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
group_id='name')
    
for message in consumer:    
    print(message.value)
sgtcortez
  • 407
  • 4
  • 15
  • This will not help, as the auto_auto_reset configuration is only applicable for the very first time when you start a client with a new group_id. As soon as the group_id is known to Kafka and messages were committed back to Kafka, the configuration auto_offset_reset will be ignored. – Michael Heil Dec 03 '20 at 14:13
  • @mike this is not what the kafka docs says. If the consumer consumed all the messages, and then it stops. This property will be read, and as there is not new message, the pointer backs to the earliest. – sgtcortez Dec 03 '20 at 14:19
  • The [doc](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#auto.offset.reset) I am reading says: "What to do when there is **no initial** offset in Kafka or if the current offset does not exist any more on the server". As soon as the group_id is committing something, then there *is* an initial offset. – Michael Heil Dec 03 '20 at 14:23
  • 1
    @mike I got it. Thanks. – sgtcortez Dec 03 '20 at 14:38
  • 1
    You may want to have a look [here](https://stackoverflow.com/questions/44927687/not-clear-about-the-meaning-of-auto-offset-reset-and-enable-auto-commit-in-kafka/63081826#63081826) as well to get some more insights. – Michael Heil Dec 03 '20 at 14:50
  • Running it with latest did not solve the problem. If I did not start from the beginning i will not be able to get the old messages i will only get the new messages when i produce them and the consumer is running. – matjar trk Dec 03 '20 at 15:04