4

From the Faust documentation I can't find out how to set the consumer to a specific offset.

With confluent-kafka I use consumer.offsets_for_times to find a start_offset and then assign the TopicPartition to that specific offset, something like:

start_offset = consumer.offsets_for_times([
    TopicPartition("prediction.OfferPredictionCheckpoint", 0, int(start_date)),
    TopicPartition("prediction.OfferPredictionCheckpoint", 1, int(start_date)),
])

consumer.assign([
    TopicPartition("prediction.OfferPredictionCheckpoint", partition_number, pos)
])

With Faust I can't find much more than:

consumer_auto_offset_reset

Which only let you set earliest or latest. How would I start reading from a specific hour or beginning of day?

galinden
  • 610
  • 8
  • 13

2 Answers2

4

to set the offset to a specific value you can use these example. Here I set the offset to 50000. Every time I start my app, the agent starts reading at the offset 50000. For this I use app.consumer.seek

Here tp takes in two params, topic - test in this case and 0 which is the partition number. For more info faust.types

from faust.types import TP, Message

tp = TP("test", 0)
topic = app.topic(tp.topic)

@app.task()
async def on_start():
    await app.consumer.seek(tp, 50000)
    print("App startet")

@app.agent(topic)
async def receive(stream):
    async for event in stream.events():
        print((event.message.offset, event.value))
Abhishek
  • 77
  • 1
  • 9
Phil997
  • 575
  • 5
  • 15
1

I think this might be what you're looking for: https://faust.readthedocs.io/en/latest/reference/faust.transport.consumer.html#faust.transport.consumer.Consumer.seek

It can go to a specific offset, however I don't think there's an easy way to tell it to go to a specific time or date without some extra logic (maybe binary search your way there using the offset?).

BWStearns
  • 2,567
  • 2
  • 19
  • 33
  • Do you have any example how to use `seek` method? It does need partitions as an input and I'm not really sure how I can provide that. – matino Sep 02 '20 at 12:49
  • @matino it looks like you want to iterate over all the partitions and seek them individually. That's my guess. I can't find a kafka example but I found this non-faust kafka guide to do the same thing here (search topicPartitionIterator) http://blog.empeccableweb.com/wp/2016/11/30/manual-offsets-in-kafka-consumers-example/ – BWStearns Sep 02 '20 at 16:32
  • 1
    I managed to go with `aiokafka`, by passing `auto_offset_reset="earliest"` to the `AIOKafkaConsumer`. – matino Sep 02 '20 at 16:48