15

I am having trouble with KafaConsumer to make it read from the beginning, or from any other explicit offset.

Running the command line tools for the consumer for the same topic , I do see messages with the --from-beginning option and it hangs otherwise

$ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginning

If I run it through python, it hangs, which I suspect to be caused by incorrect consumer configs

consumer = KafkaConsumer(topic_name,
                     bootstrap_servers=['localhost:9092'],
                     group_id=None,
                     auto_commit_enable=False,
                     auto_offset_reset='smallest')

print "Consuming messages from the given topic"
for message in consumer:
    print "Message", message
    if message is not None:
        print message.offset, message.value

print "Quit"

Output:

Consuming messages from the given topic (hangs after that)

I am using kafka-python 0.9.5 and the broker runs kafka 8.2. Not sure what the exact problem is.

Set _group_id=None_ as suggested by dpkp to emulate the behavior of console consumer.

Karthik Raj
  • 241
  • 2
  • 3
  • 6
  • I recently downloaded the kafka package and try your code and it works for me. Can you show your `consumer.properties` content file? – Víctor M Feb 05 '16 at 07:51
  • http://stackoverflow.com/questions/34684410/kafka-consumer-losing-state-of-messages-after-shutdown/34685654#34685654 you may need to set start offset... – BAE Feb 05 '16 at 13:15
  • Tried setting the starting offset too, it didn't help either. – Karthik Raj Feb 08 '16 at 02:06
  • I was testing it with a topic with more than one partition, it so happens that the issue arises only when the producer does not produce enough messages such that all partitions have at least one message in them. https://issues.apache.org/jira/browse/KAFKA-3159 The consumer works fine if all the partitions have at least one message. – Karthik Raj Feb 08 '16 at 02:18
  • Also KafkaConsumer does not throw exceptions for unsupported codecs which bit me, as I was using lz4 which is not yet supported by the consumer so it wasn't decoding the messages nor did it throw an exception. – Karthik Raj Feb 09 '16 at 22:15

7 Answers7

13

The difference between the console-consumer and the python consumer code you have posted is the python consumer uses a consumer group to save offsets: group_id="test-consumer-group" . If instead you set group_id=None, you should see the same behavior as the console consumer.

dpkp
  • 1,369
  • 7
  • 14
  • Ya that was one of the problem. The actual issue was the producer was using _lz4_ as the compression type which wasn't supported by the python consumer, which bailed out without a warning/error. – Karthik Raj Mar 17 '16 at 03:04
  • 1
    LZ4 support was added to kafka-python in 1.0; the latest version should also no longer silently fail on compression errors. – dpkp Mar 17 '16 at 05:29
  • this faulty behaviour (for single node/partitioned kafka) got me spinning for quite some time unless I found this answer – khawarizmi Aug 14 '18 at 13:35
8

I ran into the same problem: I can recieve in kafka console but can't get message with python script using package kafka-python.

Finally I figure the reason is that I didn't call producer.flush() and producer.close() in my producer.py which is not mentioned in its documentation .

7

auto_offset_reset='earliest' solved it for me.

Pavel Lisiza
  • 101
  • 2
  • 4
7

auto_offset_reset='earliest' and group_id=None solved it for me.

techkuz
  • 3,608
  • 5
  • 34
  • 62
1

My take is: to print and ensure offset is what you expect it to be. By using position() and seek_to_beginning(), please see comments in the code.

I can't explain:

  1. Why after instantiating KafkaConsumer, the partitions are not assigned, is this by design? Hack around is to call poll() once before seek_to_beginning()
  2. Why sometimes after seek_to_beginning(), first call to poll() returns no data and doesnt change the offset.

Code:

import kafka
print(kafka.__version__)
from kafka import KafkaProducer, KafkaConsumer
from time import sleep
KAFKA_URL = 'localhost:9092' # kafka broker
KAFKA_TOPIC = 'sida3_sdtest_topic' # topic name

# ASSUMING THAT the topic exist

# write to the topic
producer = KafkaProducer(bootstrap_servers=[KAFKA_URL])
for i in range(20):
    producer.send(KAFKA_TOPIC, ('msg' + str(i)).encode() )
producer.flush()

# read from the topic
# auto_offset_reset='earliest', # auto_offset_reset is needed when offset is not found, it's NOT what we need here
consumer = KafkaConsumer(KAFKA_TOPIC,
bootstrap_servers=[KAFKA_URL],
max_poll_records=2,
group_id='sida3'
)

# (!?) wtf, why we need this to get partitions assigned
# AssertionError: No partitions are currently assigned if poll() is not called
consumer.poll()
consumer.seek_to_beginning()

# also AssertionError: No partitions are currently assigned if poll() is not called
print('partitions of the topic: ',consumer.partitions_for_topic(KAFKA_TOPIC))

from kafka import TopicPartition
print('before poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))

# (!?) sometimes the first call to poll() returns nothing and doesnt change the offset
messages = consumer.poll()
sleep(1)
messages = consumer.poll()

print('after poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))

print('messages: ', messages)

Output:

2.0.1
partitions of the topic:  {0, 1}
before poll() x2: 
0
0
after poll() x2: 
0
2
messages:  {TopicPartition(topic='sida3_sdtest_topic', partition=1): [ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=0, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg0', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1), ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=1, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg1', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1)]}
Sida Zhou
  • 3,529
  • 2
  • 33
  • 48
0

I faced the same issue before, so I ran kafka-topics locally at the machine running the code to test and I got UnknownHostException. I added the IP and the host name in hosts file and it worked fine in both kafka-topics and the code. It seems that KafkaConsumer was trying to fetch the messages but failed without raising any exceptions.

HGF
  • 389
  • 3
  • 15
0

For me, I had to specify the router's IP in the kafka PLAINTEXT configuration.

Get the router's IP with:

echo $(ifconfig | grep -E "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep -v 127.0.0.1 | awk '{ print $2 }' | cut -f2 -d: | head -n1)

and then add PLAINTEXT_HOST://<touter_ip>:9092 to the kafka advertised listeners. In case of a confluent docker service the configuration is as follows:

   kafka:
    image: confluentinc/cp-kafka:7.0.1
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
      - 29092:29092
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://172.28.0.1:9092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1

and finally the python consumer is:

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['172.28.0.1:9092'],
    auto_offset_reset = 'earliest',
    group_id=None,
)

print('Listening')
for msg in consumer:
    print(msg)

Charalamm
  • 1,547
  • 1
  • 11
  • 27