2

This question is similar to Python KafkaConsumer start consuming messages from a timestamp except I want to know how to do it in the official Python Kafka client by Confluent.

I looked into the Consumer.offsets_for_times function but I'm confused by that it accepts timestamps in the TopicPartition.offset field.

How is a offset equivalent to a timestamp?

wxh
  • 619
  • 7
  • 20

2 Answers2

6

I did this recently for $work. You need to get the result of offsets_for_times(), then assign() that list to your consumer, and then call consume(). Importantly, don't subscribe() to the topic. (See Eden Hill's comment on https://github.com/confluentinc/confluent-kafka-python/issues/373).

You're right that the documentation for this function is somewhat confusing when it comes to defining timestamps vs offsets.

Update to answer followup question:

The difference to How do I get the the offset of last message of a Kafka topic using confluent-kafka-python? is that rather than

topicparts = [TopicPartition(topic_name, i) for i in range(0, 8)]

you would do something like this:

whents = datetime.fromisoformat("2022-01-01T12:34:56.000")
whenms = int(whents) * 1000   # to get milliseconds

topicparts = [TopicPartition(topic_name, i, whenms) for i in range(0, 8)]
James McPherson
  • 2,476
  • 1
  • 12
  • 16
  • Thanks for sharing the info. How do I define `TopicPartition` params for `offsets_for_times()`? Can you elaborate? Thanks – wxh Feb 20 '22 at 05:15
  • I found that using timestamp as offset was actually implemented as part of [KIP-79](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090#KIP79ListOffsetRequest/ListOffsetResponsev1andaddtimestampsearchmethodstothenewconsumer-offsetsForTimes()methodinConsumer) in 2016. It's an actual feature. Thanks. – wxh Feb 21 '22 at 16:34
0

That method doesn't accept timestamps; only partitions that you want to find timestamps for.

https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.TopicPartition.TopicPartition

Perhaps you mean the timeout parameter?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • That's exactly my point. It's what written in the document I linked, which is confusing. – wxh Feb 19 '22 at 20:17
  • I don't understand what you think is confusing. You define a list of TopicPartition objects and it returns timestamps for each. You would provide the `OFFSET_STORED` value for the offset parameter – OneCricketeer Feb 20 '22 at 07:24
  • 1
    No, the function does exact opposite. It will ”Look up offsets by timestamp for the specified partitions.”. Read the doc. – wxh Feb 20 '22 at 12:48