0

Description

I am running a Kafka server on my computer with this docker-compose file. I have created basic consumer and producer applications which I run simultaneously, and I can send and receive messages through a topic as expected.

I have also created test_streamer.py (pasted at bottom), which creates both a consumer and a producer in the same thread. The script is then meant to send a given amount of messages to a specified topic with the producer, and poll them one by one with the consumer. Unfortunately, this does not work. The always program returns an empty, no messages received. It should also be noted that another consumer, running in a separate script in parallell with test_streamer.py receives every message sent.

To summarise, when I create a consumer in the same thread as a given producer, it does not receive the messages produced by the producer. However, if the clients run in different threads, everything works as expected.

Is the fact that the clients are running in the same thread likely the explanation, or am I missing something? If the former is correct, why? Are consumers and producers not meant to run in the same thread?

confluent-kafka-python version: 1.8.2 librdkafka version: 1.8.2 client configurations:

consumer_config = {"bootstrap.servers": IP, "group.id": group} producer_config = {"bootstrap.servers": IP}

where IP and group are predefined. OS: Ubuntu 20.04

How to reproduce

Edit IP in below test_streamer.py and run.

test_streamer.py:

from confluent_kafka import Producer, Consumer

IP = "localhost:9092"
TRIES = 100
topic = "topic1"
group = "group1"

consumer_config = {"bootstrap.servers": IP, "group.id": group}
producer_config = {"bootstrap.servers": IP}

def confirm(err, msg):
    if err:
        print(err)
    else:
        print(msg)


consumer = Consumer(consumer_config)
producer = Producer(producer_config)
consumer.subscribe([topic])
producer.produce(value="init", topic=topic, on_delivery=confirm)
producer.flush()


for i in range(1, TRIES + 1):
    producer.produce(value=str(i), topic=topic)
producer.flush()

messages = []
counter = 0
while counter < 10:
    message = consumer.poll(timeout=1)
    if message:
        counter = 0
        messages.append(message.value())
    else:
        counter += 1
print(messages)

Edit 1:

I have tried adding sleep(10) after the messages have been sent, but it doesn't help. Furthermore, the timing should already be handled by producer.flush(), at least from what I understand.

Skogis
  • 1
  • 1
  • Threading shouldn't matter. You have a few configs that need changed... 1) What happens when you use the correct port of 29092? Otherwise, what is actually running on port 9092, or are you running that Python code inside the broker container rather than Ubuntu host? 2) the consumer defaults to subscribe from the end of the topic, so you should specify the starting offset position to be the earliest if you are trying to read data produced _before the consumer starts_ – OneCricketeer Jun 26 '22 at 13:33
  • 1) I have changed the port for local apps (running on host ubuntu) to 9092. As it works completely fine when the clients are not running on the same thread, it seems to work fine. Do you think the port still could be an issue? 2) It seems that i made a mistake while pasting the code. I subscribe to the topic before i send the messages. Please see the updated code. Would i still need to change to the earliest topic? – Skogis Jun 27 '22 at 05:16
  • The python code is running on the host, not in a container. – Skogis Jun 27 '22 at 05:22
  • From the linked compose file, it is only forwarding port 29092 from the host to the container (see `ports: [29092:29092]`), so if 9092 does work, then you are not connecting to that Kafka container, and are connecting to something else that is not shown in your question. Or it is unclear what/why you changed the Compose file... The order of subscribing shouldn't matter, yes you should still add `'auto.offset.reset': 'earliest'` to your consumer config, and yes, it would be better to create the consumer and subscribe after the producer is flushed – OneCricketeer Jun 27 '22 at 15:18
  • As mentioned above, I changed the port to 9092, that is, I changed `ports: [29092:29092]` to `ports: [9092:9092]`. I don't think it's relevant why is did it, but do you think that it could be a problem? – Skogis Jun 27 '22 at 18:17
  • If you did not change `KAFKA_ADVERTISED_LISTENERS` as well, then yes, that is a problem. The ports value didn't need changed; you could have just modified your python code to use 29092 instead – OneCricketeer Jun 27 '22 at 18:18
  • In any case, with the corrected Docker config, this code worts perfectly fine on my machine - https://pastebin.com/1jrU2vfa – OneCricketeer Jun 27 '22 at 18:26
  • I did change the ’KAFKA_ADVERTISED_LISTENERS’, and as mentioned before, everything works fine when i run two different scripts in parallell. – Skogis Jun 28 '22 at 08:28
  • And the pastebin I posted also works. Two scripts aren't necessary – OneCricketeer Jun 28 '22 at 13:38
  • Thanks, that’s very usefull to know. Could you please paste the line/lines you modified in the docker-compose file? – Skogis Jun 28 '22 at 18:01

0 Answers0