0

I want to parse from the topic in Kafka tool data, which inserted into the topic yesterday. The topic contains more than 600bln data. And I need only new data. I have a parser that focused on timestamp, and if timestamp more than today's data it parses it. But it takes a lot of time. For example

for msg in consumer:
    s = msg[3]
    dt_object = datetime.datetime.fromtimestamp(s/1000)
    date1 = dt_object.strftime('%Y-%m-%d %H:%M:%S')
    date = datetime.datetime.strptime('2020-06-04 00:00:00', '%Y-%m-%d %H:%M:%S')

    if dt_object > date:
        print(date1)
        num_rows = num_rows + 1
        m = json.loads(msg.value)

enter image description here

Michael Heil
  • 16,250
  • 3
  • 42
  • 77
LOTR
  • 113
  • 1
  • 1
  • 10
  • Does this answer your question? [Python KafkaConsumer start consuming messages from a timestamp](https://stackoverflow.com/questions/46402672/python-kafkaconsumer-start-consuming-messages-from-a-timestamp) – Roy2012 Jun 05 '20 at 06:06

1 Answers1

0

You can use offsets_for_times to find the offset per partition for the earliest timestamp for which you'd like to consume, and then seek to that offset with the Consumer before consuming.

You can see an example of this here: https://github.com/confluentinc/confluent-kafka-python/issues/497

Robin Moffatt
  • 30,382
  • 3
  • 65
  • 92
  • My topic contains 12 partitions, I couldn't understand the following documentation. What should I add? consumer.subscribe([param["TOPIC_IN"]]) # tp = TopicPartition(topic, 0) # offsets = consumer.offsets_for_times({tp: 1591084496000}) – LOTR Jun 05 '20 at 08:02