2

I have two topics

  • main_topic
  • retry_topic

I want if the logic fails in main_topic it should call retry_topic and if that fails so there should be max 3 retries that it should do.

I've tried using sink in faust streaming what it does is that it yield result from my main topic to retry_topic but I'm still not able to limit it to 3 retries.

Is there a way to do that in faust/kafka streaming because I know that celery has this feature.

  • In celery case do you talk about [autoretry_for](https://docs.celeryq.dev/en/stable/userguide/tasks.html#automatic-retry-for-known-exceptions)? Do you mean you want the faust agent trying 3 times to do a task on a message coming from `main_topic` and if it fails 3 times it should send the message to `retry_topic`? I don't understand what you want to achive. Could you give more detail and code example of waht you've done and what you expect? – Thomas Nov 22 '22 at 15:31
  • @Thomas, I don't know what author meant but I can't find exactly what you described. According to requierements my service should process message with three retries and in case of 3 failures it should send this message to dead letter topic. In java kafka client it is just one of configuration parameters. Do I understand right that in Faust I need to implement my own logic for this? – Gekster Dec 27 '22 at 12:43
  • 1
    Yes, I am not aware of retry mechanism in Faust. However, multiple kind of errors could occur (broker not available, error in processing, etc.), it would be complex to handle everything in the right way. Usually, I put all my processing in a ‘try/except’ and send message in a dead letter topic in the except part. You can count the exception of you want to try 3 times but I am not sure if this the right way to do what you want. – Thomas Jan 03 '23 at 13:13

1 Answers1

0

One way to achieve that is to use headers.

@app.agent(topic)
async def topic_handler(stream: faust.Stream):
    async for event in stream:
        try:
            await process_event(event)
        except Exception as e:
            logger.error(f'Error processing')
            current_try = int(stream.current_event.headers.get('retries', b'0').decode()) + 1
            await asyncio.sleep(min(current_try * 10, MAX_RETRY_WAIT))
            await retry_topic.send(key=event['id'], value=event,
                                   headers={'retries': str(current_try).encode(),
                                            'error': repr(e).encode()})

Now if you want to use the same agent for two topics, before this agent you may define topic as one of main_topic or retry_topic, something like this:

use_retry_topic = os.environ['USE_RETRY_TOPIC']
topic = app.topic(retry_topic_name if use_retry_topic else main_topic_name)
retry_topic = app.topic(retry_topic_name)

This way you need two processes. One starts with USE_RETRY_TOPIC = False, it reads main topic and if something goes wrong it sends a message to retry_topic after a delay. The other process starts with USE_RETRY_TOPIC = True, it consumes the retry topic and if something goes wrong again - sends the message to the very same topic once again, but with incremented retries count.

You can add a condition to check the retries count if it is greater than 3 if you want.

Please note that this delay logics maybe not very safe, e.g. if the process fails unexpectedly while waiting to send the message into the retry_topic, this message might be lost.

Also this approach may break the message order, link

Artem Ilin
  • 353
  • 2
  • 19