Description
I am running a Kafka server on my computer with this docker-compose file. I have created basic consumer and producer applications which I run simultaneously, and I can send and receive messages through a topic as expected.
I have also created test_streamer.py (pasted at bottom), which creates both a consumer and a producer in the same thread. The script is then meant to send a given amount of messages to a specified topic with the producer, and poll them one by one with the consumer. Unfortunately, this does not work. The always program returns an empty, no messages received. It should also be noted that another consumer, running in a separate script in parallell with test_streamer.py receives every message sent.
To summarise, when I create a consumer in the same thread as a given producer, it does not receive the messages produced by the producer. However, if the clients run in different threads, everything works as expected.
Is the fact that the clients are running in the same thread likely the explanation, or am I missing something? If the former is correct, why? Are consumers and producers not meant to run in the same thread?
confluent-kafka-python version: 1.8.2 librdkafka version: 1.8.2 client configurations:
consumer_config = {"bootstrap.servers": IP, "group.id": group}
producer_config = {"bootstrap.servers": IP}
where IP
and group
are predefined.
OS: Ubuntu 20.04
How to reproduce
Edit IP in below test_streamer.py and run.
test_streamer.py:
from confluent_kafka import Producer, Consumer
IP = "localhost:9092"
TRIES = 100
topic = "topic1"
group = "group1"
consumer_config = {"bootstrap.servers": IP, "group.id": group}
producer_config = {"bootstrap.servers": IP}
def confirm(err, msg):
if err:
print(err)
else:
print(msg)
consumer = Consumer(consumer_config)
producer = Producer(producer_config)
consumer.subscribe([topic])
producer.produce(value="init", topic=topic, on_delivery=confirm)
producer.flush()
for i in range(1, TRIES + 1):
producer.produce(value=str(i), topic=topic)
producer.flush()
messages = []
counter = 0
while counter < 10:
message = consumer.poll(timeout=1)
if message:
counter = 0
messages.append(message.value())
else:
counter += 1
print(messages)
Edit 1:
I have tried adding sleep(10)
after the messages have been sent, but it doesn't help. Furthermore, the timing should already be handled by producer.flush()
, at least from what I understand.