1

I am trying to have an At-least-once commit strategy for offset in akka streams, and I am not able to understand what is the expected pattern for the cases I use filter on my stream.

my expectations is that none of the filtered messages will get their offset comited, so they will end up in an infinite loop of processing.

An abasurd example ilustrating this is filtering all messages like this:

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.filter(_ => false)
.mapAsync(3)(_.committableOffset.commitScaladsl()) 
.runWith(Sink.ignore)

I can only see a solution of wrapping my filters in flows that check if the logic will filter out and commit in that case, but this seems not elegant, and deminish the value of having filter shapes.

Filtering is not a rare thing to do, but i cannot see any elegant way of committing the offset? for me It seems strange there is no way to do this by the framework, so what am I missing?

Kanekotic
  • 2,824
  • 3
  • 21
  • 35
  • Just running into the exact same issue. Don't have a solution yet, but also would have liked the 'filter' just to do what you expect. I guess an solution would be to set auto-commit to `true`, but that is currently not an option for us since we do not store the offset. – Joost den Boer Mar 15 '18 at 15:15
  • hello @JoostdenBoer in my case it was the same. In any case i will answer my question also in a second auto-commit works at the level of kafka so it should be fine to set it to true. We end up doing a combination of this and graceful shutdown, so wen aws/linux sends the kill signal it will wait for the current actor materializer to finish any ongoing job. – Kanekotic Mar 16 '18 at 07:24

1 Answers1

1

I was not able to find a solution with the current akka implementation to have a more intelligent commiting of the index, So I have delegated the responsibility to kafka setting auto-commit at kafka level and also combined this with a graceful shutdown strategy for the app so when the blue/green deployment happens all the messages are process before the application closes.

  • Autocommit to true:
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group1")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
  • Graceful shutdown:
val actorMaterializer = ActorMaterializer(
  ActorMaterializerSettings(system)
scala.sys.addShutdownHook {
        actorMaterializer.system.terminate()
        Await.result(actorMaterializer.system.whenTerminated, 30.seconds)
}
Kanekotic
  • 2,824
  • 3
  • 21
  • 35
  • Interesting, but I was thinking of a different solution. I have been looking at the RestartSource implementation wondering whether that would be an approach to create a RecoverableSource which put all events which cause error onto a dead letter-kind of topic including some metadata like timestamp, which service, error, etc. to be analysed later. – Joost den Boer Mar 20 '18 at 19:39
  • Or, instead of using a CommittableItem in the stream (which wraps the offset and a value 'T'), use something like a Try or Or so in the end any failure can be recovered, but also have the original event and the offset to commit. But on our side, this is still work in progress. I have some feature to deliver also. ;-) – Joost den Boer Mar 20 '18 at 19:40
  • So the idea of CommittableItem was my first approach when trying to solve this, the problem i see is any operation that removes items (filters, supervisor, etc) will have to commit the index, making it cumbersome and without bulk capacity it will make it also not performant. And also for me RestartSource seems more a fit for something like rabbitMQ than for kafka, because kafka has that retry mechanism already in place based on the commit of offset. In general seems a limitation of akka, from some conversations i had seems kafka-streams is a little bit more advance in this subject. – Kanekotic Mar 21 '18 at 11:51