1

I was trying to replicate the steps given in the blog. While trying, there given a Kafka Consumer and Kafka Producer python code, I am able run the code in python interactive terminal, and able the consumer console gives the output, But if I pass them in a python file (*.py), it consumes nothing.

Consumer

from kafka import KafkaConsumer
consumer = KafkaConsumer('sample')
for message in consumer:
    print (message)

Producer

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('sample', b'Hello, World!')
producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')

How can I make it work in a python file?

Tom J Muthirenthi
  • 3,028
  • 7
  • 40
  • 60
  • Does it raise any exception? Or it failed silently? – knh190 May 06 '19 at 10:02
  • @knh190 The consumer window (after running) stays still, and the producer ends immediately. – Tom J Muthirenthi May 06 '19 at 10:17
  • Can you validate the message is truly produced using command line like [this post](https://stackoverflow.com/questions/28579948/java-how-to-get-number-of-messages-in-a-topic-in-apache-kafka/28617771#28617771) suggested? – knh190 May 06 '19 at 10:21
  • Or [this gist](https://gist.github.com/ursuad/e5b8542024a15e4db601f34906b30bb5) is more helpful. – knh190 May 06 '19 at 10:23
  • @knh190 The python program fails to produce messages. No new message is created in the topic after running producer.py – Tom J Muthirenthi May 06 '19 at 10:35
  • The [doc](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html) didn't say if it fails in any case but it sounds possible that the kafka topic is not created. Or something wrong with kafka configuration. You may furthur debug using command line tools to find out it's wether kafka or python-side's problem. – knh190 May 06 '19 at 11:05
  • @knh190 I just added `producer.flush()` to the producer code, and it started to work. Not sure why is it so? – Tom J Muthirenthi May 06 '19 at 12:08

1 Answers1

2

I just added producer.flush() to the producer code, and it started to work.

Because Kafka clients send messages in batches, not immediately to reduce load on the brokers.

You didn't send enough data initially for a flush to occur on its own so your data just sat in memory while your app ended.

Refer batch.size producer property

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245