One way to achieve that is to use headers.
@app.agent(topic)
async def topic_handler(stream: faust.Stream):
async for event in stream:
try:
await process_event(event)
except Exception as e:
logger.error(f'Error processing')
current_try = int(stream.current_event.headers.get('retries', b'0').decode()) + 1
await asyncio.sleep(min(current_try * 10, MAX_RETRY_WAIT))
await retry_topic.send(key=event['id'], value=event,
headers={'retries': str(current_try).encode(),
'error': repr(e).encode()})
Now if you want to use the same agent for two topics, before this agent you may define topic
as one of main_topic
or retry_topic
, something like this:
use_retry_topic = os.environ['USE_RETRY_TOPIC']
topic = app.topic(retry_topic_name if use_retry_topic else main_topic_name)
retry_topic = app.topic(retry_topic_name)
This way you need two processes. One starts with USE_RETRY_TOPIC = False
, it reads main topic and if something goes wrong it sends a message to retry_topic
after a delay.
The other process starts with USE_RETRY_TOPIC = True
, it consumes the retry topic and if something goes wrong again - sends the message to the very same topic once again, but with incremented retries
count.
You can add a condition to check the retries
count if it is greater than 3
if you want.
Please note that this delay logics maybe not very safe, e.g. if the process fails unexpectedly while waiting to send the message into the retry_topic
, this message might be lost.
Also this approach may break the message order, link