I am trying to have an At-least-once commit strategy for offset in akka streams, and I am not able to understand what is the expected pattern for the cases I use filter on my stream.
my expectations is that none of the filtered messages will get their offset comited, so they will end up in an infinite loop of processing.
An abasurd example ilustrating this is filtering all messages like this:
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.filter(_ => false)
.mapAsync(3)(_.committableOffset.commitScaladsl())
.runWith(Sink.ignore)
I can only see a solution of wrapping my filters in flows that check if the logic will filter out and commit in that case, but this seems not elegant, and deminish the value of having filter shapes.
Filtering is not a rare thing to do, but i cannot see any elegant way of committing the offset? for me It seems strange there is no way to do this by the framework, so what am I missing?