Questions tagged [confluent-kafka-python]

Confluent Kafka Python is a performant implemention of Kafka producers, consumers and the admin client in Python and it is based on librdkafka.

Confluent Kafka Python is a performant implemention of a Kafka client library based on librdkafka. You can use it to connect to Kafka brokers in Python.

219 questions
19
votes
7 answers

Unable to install confluent-kafka: "fatal error: librdkafka/rdkafka.h: No such file or directory"

I am using the confluent-kafka Python client in my project. I'm trying to create a Docker image with this client. I am facing the following error:- #11 8.015 [pipenv.exceptions.InstallError]: In file included from…
8
votes
1 answer

Ubuntu 18.04 python3.7 confluent-kafka no module named 'confluent_kafka.cimpl'

I have VM run on ubuntu 18.04. On that I installed: python3.7 confluent-kafka when run python script, I met this message error: from .cimpl import (Consumer, #noqa ModuleNotFoundError: no module named 'confluent_kafka.cimpl' I've…
cobi.hien
  • 81
  • 1
  • 2
8
votes
2 answers

Error while using confluent-kafka python library with AWS lambda

I am trying to use the confluent-kafka python library to administer my cluster via a lambda function but the function fails with the error: "Unable to import module 'Test': No module named 'confluent_kafka.cimpl'" My…
6
votes
0 answers

confluent kakfa producer KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}

I'm new to Kafka, using confluent kafka and trying to write messages to existing kafka topic from AWS EC2 instance using python producer code with 'sasl.mechanism': 'PLAIN','security.protocol': 'SASL_SSL'.I tried the producer example from this link.…
marjun
  • 696
  • 5
  • 17
  • 30
6
votes
2 answers

Get Latest Message for a Confluent Kafka Topic in Python

Here's what I've tried so far: from confluent_kafka import Consumer c = Consumer({... several security/server settings skipped... 'auto.offset.reset': 'beginning', 'group.id': 'my-group'}) c.subscribe(['my.topic']) msg…
4
votes
0 answers

Kafka consumer disconnect - Disconnected (after 684744ms in state UP)

I have a problem in my kafka consumer. When I run my consumer in localhost he run correctly, but when I run my consumer in another environment he show this error %6|1629836068.968|rdkafka#consumer-1|…
Leorfk
  • 41
  • 1
4
votes
2 answers

How to determine if a kafka topic exists using confluent-kafka-python

I'm using the confluent-kafka-python package to interface with a Kafka server. I can successfully create the topic and push events to it. However, my problem lies when I spin up multiple nodes (running in Docker), if a second instance also tries to…
nalyd88
  • 4,940
  • 8
  • 35
  • 51
4
votes
0 answers

How can I unit test code which uses a confluent_kafka Consumer?

I'm trying to unit test the following piece of code using pytest: import json from typing import Any, Dict from confluent_kafka import Consumer def get_message(config: Dict[str, Any]): consumer = Consumer( { "group.id":…
Pitirus
  • 117
  • 1
  • 13
4
votes
1 answer

Kafka consumer does not continue from where it left off

I'm currently using consumer groups to read messages from kafka. I have noticed however, that if my consumer goes down and I bring it back up again, it does not consume messages from where it left off. After reading the documentation here, it seems…
Vivek Rao
  • 576
  • 4
  • 25
3
votes
0 answers

Inconsistent results when counting Kafka records using ksqldb and confluent-kafka

I wish to count the number of records in a given Kafka topic to set up a monitoring endpoint as a kind of 'sanity check' for data-in vs. data-out. I noticed that doing this with confluent-kafka 1.9.2 produces results that differ from those produced…
filpa
  • 3,651
  • 8
  • 52
  • 91
3
votes
1 answer

Stream elasticsearch data to kafka

I have a requirement to stream the data from Elasticsearch to Kafka. I'm looking at a any connectors between Elasticsearch and Kafka (Python implementation) that could either automatically identified CDC and streams the data to kafka or i can…
RAVITEJA SATYAVADA
  • 2,503
  • 23
  • 56
  • 88
3
votes
1 answer

Set group's offsets to latest with Python's confluent-kafka (one-time admin action)

Say that I have a topic my-topic and a group my-group, and that my clients use the confluent-kafka Python package. Consumers are configured with "auto.offset.reset": "earliest" to ensure that all messages are processed at least once. Now say I…
3
votes
1 answer

How do I get the the offset of last message of a Kafka topic using confluent-kafka-python?

I need to retrive last N messages of a topic using confluent-kafka-python. I've been reading https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html# for a day, but no finding any appropriate method for getting the…
wxh
  • 619
  • 7
  • 20
3
votes
0 answers

Python confluent-kafka package return returns "Disconnected while requesting ApiVersion"

Environment os: macOS python library confluent_kafka==1.7.0 external kafka broker version 2.6.1 not specify any security protocol, everything is default PLAINTEXT run this from confluent_kafka import Consumer c = Consumer( { …
Denis Sered
  • 80
  • 2
  • 7
3
votes
2 answers

How to get parameter hints in VSCode for pyd files in general and confluent_kafka specifically?

I am new to kafka, and trying work some basic examples via VScode. The problem is I can't seem to make parameter hints work at all for all the artefacts imported via confluent_kafka. The module itself is a wrapper, and I was wondering if there was a…
Imad
  • 2,358
  • 5
  • 26
  • 55
1
2 3
14 15