3

I use pykafka to fetch message from kafka topic, and then do some process and update to mongodb. As the pymongodb can update only one item every time, so I start 100 processes. But when starting, some processes occoured errors "PartitionOwnedError and ConsumerStoppedException". I don't know why. Thank you.

kafka_cfg = conf['kafka']
kafka_client = KafkaClient(kafka_cfg['broker_list'])                        
topic = kafka_client.topics[topic_name]                 

balanced_consumer = topic.get_balanced_consumer(
consumer_group=group,
auto_commit_enable=kafka_cfg['auto_commit_enable'],
zookeeper_connect=kafka_cfg['zookeeper_list'],
zookeeper_connection_timeout_ms = kafka_cfg['zookeeper_conn_timeout_ms'],
consumer_timeout_ms = kafka_cfg['consumer_timeout_ms'],
)
while(1):
    for msg in balanced_consumer:
        if msg is not None:
            try:
                value = eval(msg.value)
                id = long(value.pop("id"))
                value["when_update"] = datetime.datetime.now()
                query = {"_id": id}}

                result = collection.update_one(query, {"$set": value}, True)
            except Exception, e:
                log.error("Fail to update: %s, msg: %s", e, msg.value)

>

Traceback (most recent call last):
  File "dump_daily_summary.py", line 182, in <module>
    dump_daily_summary.run()
  File "dump_daily_summary.py", line 133, in run
    for msg in self.balanced_consumer:
  File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__
    message = self.consume(block=True)
  File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 734, in consume
    raise ConsumerStoppedException
pykafka.exceptions.ConsumerStoppedException

>

Traceback (most recent call last):
  File "dump_daily_summary.py", line 182, in <module>
    dump_daily_summary.run()
  File "dump_daily_summary.py", line 133, in run
    for msg in self.balanced_consumer:
  File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__
    message = self.consume(block=True)
  File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 726, in consume
    self._raise_worker_exceptions()
  File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 271, in _raise_worker_exceptions
    raise ex
pykafka.exceptions.PartitionOwnedError
raymond
  • 31
  • 3

3 Answers3

1

PartitionOwnedError: check if there are some background process consuming in the same consumer_group, maybe there are not enough available partitions for starting another consumer.

ConsumerStoppedException: you can try upgrading your pykafka version (https://github.com/Parsely/pykafka/issues/574)

raittes
  • 5,271
  • 3
  • 30
  • 27
  • Thanks for answer. There is 100 partitions in the topic, no other process in the same group. I think maybe the processes I started become in balance with the 100 partitions, and new process will seize the partitions and break the banlance? – raymond Nov 09 '16 at 09:10
0

I met the same problem like you. But, I confused about others' solutions like adding enough partitions for consumers or updating the version of pykafka. In fact, mine satisfied those conditions above.

Here is the version of tools:

python 2.7.10

kafka 2.11-0.10.0.0

zookeeper 3.4.8

pykafka 2.5.0

Here is my code:

class KafkaService(object):
    def __init__(self, topic):
        self.client_hosts = get_conf("kafka_conf", "client_host", "string")
        self.topic = topic
        self.con_group = topic
        self.zk_connect = get_conf("kafka_conf", "zk_connect", "string")

    def kafka_consumer(self):
        """kafka-consumer client, using pykafka

        :return: {"id": 1, "url": "www.baidu.com", "sitename": "baidu"}
        """
        from pykafka import KafkaClient
        consumer = ""
        try:
            kafka = KafkaClient(hosts=str(self.client_hosts))
            topic = kafka.topics[self.topic]

            consumer = topic.get_balanced_consumer(
                consumer_group=self.con_group,
                auto_commit_enable=True,
                zookeeper_connect=self.zk_connect,
            )
        except Exception as e:
            logger.error(str(e))

        while True:
            message = consumer.consume(block=False)
            if message:
                print "message:", message.value
                yield message.value

The two exceptions(ConsumerStoppedException and PartitionOwnedError), are raised by the function consum(block=True) of pykafka.balancedconsumer.

Of course, I recommend you to read the source code of that function.

There is a argument block=True, after altering it to False, the programme can not fall into the exceptions.

Then the kafka consumers work fine.

CHENJIAN
  • 1,810
  • 13
  • 24
0

This behavior is affected by a longstanding bug that was recently discovered and is currently being fixed. The workaround we've used in production at Parse.ly is to run our consumers in an environment that handles automatically restarting them when they crash with these errors until all partitions are owned.

Emmett Butler
  • 5,969
  • 2
  • 29
  • 47