I am using Akka Streams Kafka to pipe Kafka messages to a remote service. I want to guarantee that the service receives every message exactly once (at-least-once AND at-most-once delivery).
Here's the code I came up with:
private def startFlow[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
topicPattern: String,
mapCommittableMessageToSinkMessage: Function[CommittableMessage[String, String], T]) {
val groupId = config.getString("group-id")
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
import system.dispatcher // the ExecutionContext that will be used in ask call below
Consumer.committableSource(consumerSettings, Subscriptions
.topicPattern(topicPattern))
.map(message => (message, mapCommittableMessageToSinkMessage(message)))
.mapAsync(1)(tuple => ask(subscriber, tuple._2).map(_ => tuple._1))
.mapAsync(1)(message => message.committableOffset.commitScaladsl())
.runWith(Sink.ignore)
}
As the code shows, it maps tuples of the original message, as well as the transformed messages passed to the subscriber (an actor that sends to remote service). The purpose of the tuple is to commit the offset after the subscriber completes processing.
Something about it just seems like an anti-pattern, but I'm not sure a better way to do it. Any suggestions on a better way?
Thanks!