Questions tagged [kafka-python]

Kafka-Python provides low-level protocol support for Apache Kafka as well as high-level consumer and producer classes. Request batching is supported by the protocol as well as broker-aware request routing. Gzip and Snappy compression is also supported for message sets.

kafka-python provides low-level protocol support for Apache Kafka as well as high-level consumer and producer classes. Request batching is supported by the protocol as well as broker-aware request routing. Gzip and Snappy compression is also supported for message sets.

For more details about Python Kafka Client API, please refer https://kafka-python.readthedocs.io/en/latest/

443 questions
72
votes
11 answers

Kafka in Docker not working

I am trying to use wurstmeister\kafka-docker image with docker-compose, but I am having real problems with connecting everything. All the posts or questions that I check, seems not to have any problems, but I am frankly lost. (And there are at…
nanounanue
  • 7,942
  • 7
  • 41
  • 73
55
votes
7 answers

How to programmatically create a topic in Apache Kafka using Python

So far I haven't seen a python client that implements the creation of a topic explicitly without using the configuration option to create automatically the topics.
jpgerek
  • 796
  • 1
  • 8
  • 13
42
votes
1 answer

SyntaxError on "self.async" when running python kafka producer

Traceback (most recent call last): File "//producer.py", line 1, in from kafka.producer import KafkaProducer File "/usr/local/lib/python3.9/site-packages/kafka/__init__.py", line 23, in from kafka.producer import…
Harvey
  • 668
  • 2
  • 7
  • 15
36
votes
8 answers

How to get latest offset for a partition for a kafka topic?

I am using the Python high level consumer for Kafka and want to know the latest offsets for each partition of a topic. However I cannot get it to work. from kafka import TopicPartition from kafka.consumer import KafkaConsumer con =…
Saket
  • 3,079
  • 3
  • 29
  • 48
23
votes
3 answers

Does Kafka python API support stream processing?

I have used Kafka Streams in Java. I could not find similar API in python. Do Apache Kafka support stream processing in python?
23
votes
2 answers

How to subscribe to a list of multiple kafka wildcard patterns using kafka-python?

I'm subscribing to Kafka using a pattern with a wildcard, as shown below. The wildcard represents a dynamic customer id. consumer.subscribe(pattern='customer.*.validations') This works well, because I can pluck the customer Id from the topic…
Ben Harrison
  • 2,121
  • 4
  • 24
  • 40
16
votes
7 answers

NoBrokersAvailable: NoBrokersAvailable-Kafka Error

i have already started to learn Kafka. Trying basic operations on it. I have stucked on a point which about the 'Brokers'. My kafka is running but when i want to create a partition. from kafka import TopicPartition (ERROR THERE) consumer =…
15
votes
2 answers

Python AVRO reader returns AssertionError when decoding kafka messages

Newbie playing with Kafka and AVRO. I am trying to deserialise AVRO messages in Python 3.7.3 using kafka-python, avro-python3 packages and following this answer. The function responsible for decoding the Kafka messages is def…
Mattia Paterna
  • 1,268
  • 3
  • 15
  • 31
15
votes
7 answers

kafka-python: producer is not able to connect

kafka-python (1.0.0) throws error while connecting to the broker. At the same time /usr/bin/kafka-console-producer and /usr/bin/kafka-console-consumer work fine. Python application used to work well also, but after zookeeper restart, it no longer…
alex_123
  • 211
  • 1
  • 2
  • 7
15
votes
7 answers

kafka-python consumer not receiving messages

I am having trouble with KafaConsumer to make it read from the beginning, or from any other explicit offset. Running the command line tools for the consumer for the same topic , I do see messages with the --from-beginning option and it hangs…
Karthik Raj
  • 241
  • 2
  • 3
  • 6
14
votes
2 answers

kafka-python raise UnrecognizedBrokerVersion Error

I am getting this error when constructing KafkaProducer with the kafka-python package: [ERROR] UnrecognizedBrokerVersion: UnrecognizedBrokerVersion Traceback (most recent call last): File "/var/lang/lib/python3.7/imp.py", line 234, in load_module …
Rene B.
  • 6,557
  • 7
  • 46
  • 72
14
votes
2 answers

How to determine API version of Kafka?

I am using kafka-python for accessing Kafka. I try to create a Kafka Producer: kafka_producer = KafkaProducer(bootstrap_servers=['kafka:9092']) but this fails with exception kafka.errors.NoBrokersAvailable: NoBrokersAvailable. I've found out I need…
Michal Špondr
  • 1,337
  • 2
  • 21
  • 44
13
votes
1 answer

Update message in Kafka topic

I am using Python Kafka topic. Is there any provision producer that can update a message in a queue in Kafka and append it to the top of queue again? According to spec of Kafka, it doesn't seems feasible.
Prannoy Mittal
  • 1,525
  • 5
  • 21
  • 32
13
votes
4 answers

How to stop Python Kafka Consumer in program?

I am doing Python Kafka consumer (trying to use kafka.consumer.SimpleConsumer or kafka.consumer.simple.SimpleConsumer in http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html). When I run the following piece of code, it will run…
BAE
  • 8,550
  • 22
  • 88
  • 171
12
votes
1 answer

Kafka optimal retention and deletion policy

I am fairly new to kafka so forgive me if this question is trivial. I have a very simple setup for purposes of timing tests as follows: Machine A -> writes to topic 1 (Broker) -> Machine B reads from topic 1 Machine B -> writes message just read to…
1
2 3
29 30