1

I am looking at how to log all Kafka topics/events. Is there any env variable that I can run my Kafka container? Just for the development purpose, I used

docker run -p 9092:9092 -e ADVERTISED_HOST=127.0.0.1 johnnypark/kafka-zookeeper

and when I run my python script I was able to consume and publish events

from kafka import KafkaConsumer

consumer = KafkaConsumer('other-topic')

for msg in consumer:
    print (msg)
from kafka import KafkaProducer

producer = KafkaProducer()

producer.send('other-topic', b'some_message_bytes')

producer.flush()

I bet there is a way to log all events, but I could not find it.

PS. I want to log all events only for development purposes. I do not want to add handlers to watch all topics, but if there is a way to consume all topics by KafkaConsumer class then will be great to know about it too.

  • 1
    https://stackoverflow.com/questions/39520222/how-to-subscribe-to-a-list-of-multiple-kafka-wildcard-patterns-using-kafka-pytho – Ran Lupovich Jun 18 '21 at 09:35

2 Answers2

0

You can use regex pattern to define which topics to consume from, have a look on the following example from another question

How to subscribe to a list of multiple kafka wildcard patterns using kafka-python?

def subscribe(self, topics=(), pattern=None, listener=None):
    """Subscribe to a list of topics, or a topic regex pattern
    Partitions will be dynamically assigned via a group coordinator.
    Topic subscriptions are not incremental: this list will replace the
    current assignment (if there is one).

For your use case , you could use pattern of '.*'

And for new topics , there is configurable paramter that says how much time after new topic is created to subscribe it, check this answer as well

Kafka pattern subscription. Rebalancing is not being triggered on new topic

Ran Lupovich
  • 1,655
  • 1
  • 6
  • 13
  • Thanks for your answer, unfortunately, it work with patterns like 'customer*', but it throws an error with '*' `re.error: nothing to repeat at position 0` –  Jun 18 '21 at 10:16
  • 1
    Can you try something like ".*" – Ran Lupovich Jun 18 '21 at 10:44
  • Works only for already used events. But for the new one, it does not :( –  Jun 18 '21 at 11:05
  • I think we will be not able to solve it by KafkaConsumer class because `pattern (str): Pattern to match available topics. You must provide either topics or pattern, but not both.` –  Jun 18 '21 at 11:14
  • So it means it will take all available topics and I think will not care about created later. –  Jun 18 '21 at 11:15
  • That actually do should work check this out – Ran Lupovich Jun 18 '21 at 13:22
  • https://stackoverflow.com/questions/38754865/kafka-pattern-subscription-rebalancing-is-not-being-triggered-on-new-topic – Ran Lupovich Jun 18 '21 at 13:22
0

You can consume from multiple topics using dpkp/kafka-python library. As they have mentioned in there KafkaConsumer documentation, You can simply init consumer by giving multiple topic list you want.

classkafka.KafkaConsumer(*topics, **configs)

Parameters:
*topics (str) – optional list of topics to subscribe to. If not set, call subscribe() or assign() before consuming records

It is optional to set topics when you initialise the consumer. Later you can call consumer.subscribe() and give topic list or pattern to a topic list.

def subscribe(self, topics=(), pattern=None, listener=None):

Arguments:

topics (list): List of topics for subscription.

pattern (str): Pattern to match available topics. You must provide either topics or pattern, but not both.

If you don't know the topic list in kafka, you can call consumer.topics() to get all topics.

def topics(self):

Get all topics the user is authorized to view. This will always issue a remote call to the cluster to fetch the latest information.

    Returns:
        set: topics

Sources

nipuna
  • 3,697
  • 11
  • 24
  • Hmm, not bad. But I am looking for something more dynamic, cause once I will define the list of topics then I will be not able to subscribe to new ones. –  Jun 18 '21 at 10:24
  • Just an idea. I haven't experience on this. you have to restart consumer for that. Write separate thread running in background and get topic list and compare with previous. if there is new topics, restart consumer with new list. – nipuna Jun 18 '21 at 13:20