Questions tagged [pykafka]

PyKafka is a cluster-aware Kafka>=0.8.2 client for Python

About

PyKafka is a cluster-aware Kafka>=0.8.2 client for Python. It includes Python implementations of Kafka producers and consumers, which are optionally backed by a C extension built on librdkafka, and runs under Python 2.7+, Python 3.4+, and PyPy.

Links

55 questions
9
votes
1 answer

How to make kafka-python or pykafka work as an async producer with uwsgi and gevent?

My Stack is uwsgi with gevents. I am trying to wrap my api end points with a decorator to push all request data (url, method, body and response) to a kafka topic, But its not working. My theory is because I am using gevents, and I am trying to run…
Harsh M
  • 625
  • 2
  • 11
  • 25
6
votes
4 answers

How to read messages from kafka consumer group without consuming?

I'm managing a kafka queue using a common consumer group across multiple machines. Now I also need to show the current content of the queue. How do I read only those messages within the group which haven't been read, yet making those messages again…
Priyam Singh
  • 816
  • 2
  • 11
  • 10
5
votes
2 answers

PyKafka metadata in bytes instead of strings

I see an unusual behaviour with PyKafka, a client that I just recently began to use. The error is the following: Failed to connect newly created broker for b'4758e4ee1af6':9092 {0:
Chobeat
  • 3,445
  • 6
  • 41
  • 59
4
votes
0 answers

High latency PyKafka

We are making a real-time product, which receive images and send back information associated. For scalability purpose we decided to use Kafka to balance the workload among Kubenertes nodes. Frontend -> MainWorker -> Kafka (1) -> Worker -> Kafka (2)…
Max0u
  • 691
  • 1
  • 9
  • 21
4
votes
1 answer

How Publisher publish message to topic in Apache Kafka?

I am new for Apache Kafka. I do not understand anatomy of Topic and Partition in Apache Kafka and the way the Producer pushes data to partition. Consider a scenario, I have two Producers PR1, PR2 and three brokers B1, B2, B3. And one topic T1 with…
Green
  • 577
  • 3
  • 10
  • 28
3
votes
0 answers

Produce/Consume to Remote Kafka Does not Work

I have set up a AWS EC2 instance running Apache Kafka 0.8 via a Bitnami AMI image. The server properties are pretty much default (Kafka located at localhost:9092 and zookeeper located at localhost:2181). When I SSH into the machine, I can…
3
votes
2 answers

Kafka and Pyspark Integration

I am naive in Big data, I am trying to connect kafka to spark. Here is my producer code import os import sys import pykafka def get_text(): ## This block generates my required text. text_as_bytes=text.encode(text) …
3
votes
3 answers

why I got the errors PartitionOwnedError and ConsumerStoppedException when starting a few consumers

I use pykafka to fetch message from kafka topic, and then do some process and update to mongodb. As the pymongodb can update only one item every time, so I start 100 processes. But when starting, some processes occoured errors "PartitionOwnedError…
raymond
  • 31
  • 3
2
votes
2 answers

Write a csv file to a kafka topic

I have a large csv and I want to write to a kafka topic. def producer(): producer = KafkaProducer(bootstrap_servers='mykafka-broker') with open('/home/antonis/repos/testfile.csv') as file: reader = csv.DictReader(file,…
e7lT2P
  • 1,635
  • 5
  • 31
  • 57
2
votes
0 answers

How to dynamically create kafka producers

first I am doing baby steps in python and kafka, So let's say I have a listA=[item1, item2, item3] and every item of listA is a producer on a topic. Now what I want is to dynamically add/remove items to listA and became immediately producers also…
bihire boris
  • 1,530
  • 3
  • 19
  • 44
2
votes
2 answers

PyKafka Api usage

I am a newbie to Kafka and PyKafka.I know that a producer and a consumer are made in PyKafka via the below code. from pykafka import KafkaClient client = KafkaClient("localhost:9092") topic = client.topics["topicname"] producer =…
2
votes
1 answer

Pykafka - sending messages and receiving acknowledgments asynchronously

PyKafka has the limitation that: delivery report queue is thread-local: it will only serve reports for messages which were produced from the current thread I'm trying to write a script where I can asynchronously send messages using one function,…
2
votes
0 answers

Kafka long coordinator load time and small ISRs

I'm using Kafka 0.8.2.1, running a topic with 200 partitions and RF=3, with log retention set to about 1GB. An unknown event caused the cluster to enter the "coordinator load" or "group load" state. A few signals made this apparent: the…
Emmett Butler
  • 5,969
  • 2
  • 29
  • 47
2
votes
1 answer

Why is producer in pykafka so slow?

I wrote a simple producer using pykafka but can't seem to get it to perform. The basic producer and call to produce is below. When I call this 100 times with a small message, and add some timing/profiling code, it takes about 14 seconds. I…
GregH
  • 12,278
  • 23
  • 73
  • 109
2
votes
1 answer

How to select starting offset in Pykafka simpleconsumer?

In my kafka cluster single partition topic i have a simple consumer processing all incoming messages, in case of error about data processed i want to reprocess in the same order all message from a certain offset (not the beginning) to fix the…
Giuseppe
  • 363
  • 5
  • 19
1
2 3 4