2

TLDR:

  • Is committing produced message's offset as consumed (even if it wasn't) expected behavior for auto-commit enabled Kafka clients? (for the applications that consuming and producing the same topic)

Detailed explanation:

I have a simple scala application that has an Akka actor which consumes messages from a Kafka topic and produces the message to the same topic if any exception occurs during message processing.

TestActor.scala

  override protected def processMessage(messages: Seq[ConsumerRecord[String, String]]): Future[Done] = {
    Future.sequence(messages.map(message => {
      logger.info(s"--CONSUMED: offset: ${message.offset()} message: ${message.value()}")
      // in actual implementation, some process is done here and if an exception occurs, the message is sent to the same topic as seen below
      sendToExceptionTopic(Instant.now().toEpochMilli)
      Thread.sleep(1000)
      Future(Done)
    })).transformWith(_ => Future(Done))
  }

This actor starts every minute and runs for 20 seconds then stops.

Starter.scala

  def init(): Unit = {
    exceptionManagerActor ! InitExceptionActors

    system.scheduler.schedule(2.second, 60.seconds) {
      logger.info("started consuming messages")
      exceptionManagerActor ! ConsumeExceptions
    }
  }

ExceptionManagerActor.scala

  private def startScheduledActor(actorRef: ActorRef): Unit = {
    actorRef ! Start

    context.system.scheduler.scheduleOnce(20.seconds) {
      logger.info("stopping consuming messages")
      actorRef ! Stop
    }
  }

BaseActorWithAutoCommit.scala

  override def receive: Receive = {
    case Start =>
      consumerBase = consumer
        .groupedWithin(20, 2000.millisecond)
        .mapAsyncUnordered(10)(processMessage)
        .toMat(Sink.seq)(DrainingControl.apply)
        .run()

    case Stop =>
      consumerBase.drainAndShutdown().transformWith {
        case Success(value) =>
          logger.info("actor stopped")
          Future(value)
        case Failure(ex) =>
          logger.error("error: ", ex)
          Future.failed(ex)
      }
    //Await.result(consumerBase.drainAndShutdown(), 1.minute)
  }

With this configuration, while stopping, Kafka client is committing the latest produced message's offset as if it was consumed.

Example logs:

14:28:48.868 INFO - started consuming messages
14:28:50.945 INFO - --CONSUMED: offset: 97 message: 1
14:28:51.028 INFO - ----PRODUCED: offset: 98 message: 1643542130945
...
14:29:08.886 INFO - stopping consuming messages
14:29:08.891 INFO - --CONSUMED: offset: 106 message: 1643542147106
14:29:08.895 INFO - ----PRODUCED: offset: 107 message: 1643542148891 <------ this message was lost
14:29:39.946 INFO - actor stopped
14:29:39.956 INFO - Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://test-consumer/system/Materializers/StreamSupervisor-2/$$a#1541548736] to Actor[akka://test-consumer/system/kafka-consumer-1#914599016] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://test-consumer/system/kafka-consumer-1#914599016] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
14:29:48.866 INFO - started consuming messages <----- The message with offset 107 was expected in this cycle to consume but it was not consumed
14:30:08.871 INFO - stopping consuming messages
14:30:38.896 INFO - actor stopped

As you can see from the logs, a message with offset 107 is produced but was not consumed in the next cycle.

Actually, I am not an expert on Akka actors and don't know if this situation comes from Kafka or Akka, but it seems it is related to auto-commit to me.


used dependency versions:

lazy val versions = new {
  val akka = "2.6.13"
  val akkaHttp = "10.1.9"
  val alpAkka = "2.0.7"
  val logback = "1.2.3"
  val apacheCommons = "1.7"
  val json4s = "3.6.7"
}

libraryDependencies ++= {
  Seq(
    "com.typesafe.akka" %% "akka-slf4j" % versions.akka,
    "com.typesafe.akka" %% "akka-stream-kafka" % versions.alpAkka,
    "com.typesafe.akka" %% "akka-http" % versions.akkaHttp,
    "com.typesafe.akka" %% "akka-protobuf" % versions.akka,
    "com.typesafe.akka" %% "akka-stream" % versions.akka,
    "ch.qos.logback" % "logback-classic" % versions.logback,
    "org.json4s" %% "json4s-jackson" % versions.json4s,
    "org.apache.commons" % "commons-text" % versions.apacheCommons,
  )
}

An example source code and steps to reproduce the situation can be reached from this repository

kadir
  • 589
  • 1
  • 7
  • 29
  • 1
    I suggest trying to print thread ids in your logs – OneCricketeer Jan 30 '22 at 15:36
  • actually thread ids are printed(I just removed in the question) but it did not help me. You can see in readme file of https://github.com/akadir/scala-kafka-experiment – kadir Jan 30 '22 at 18:56
  • 1
    My point is that thread is that thread 5, 6, and 7, there in the readme are all running in parallel. You can't just look at a linear stream of timestamps and assume that's the exact order things are happening as if it's one thread. But also, Kafka producers dont immediately send every record, it batches them, and drops them if the producer is not flushed before its closed, so you should only log on an ACK callback, or after a flush – OneCricketeer Jan 31 '22 at 15:34

1 Answers1

3

As far as Kafka is concerned, the message is consumed as soon as Alpakka Kafka reads it from Kafka.

This is before the actor inside of Alpakka Kafka has emitted it to a downstream consumer for application level processing.

Kafka auto-commit (enable.auto.commit = true) will thus result in the offset being committed before the message has been sent to your actor.

The Kafka docs on offset management do (as of this writing) refer to enable.auto.commit as having an at-least-once semantic, but as noted in my first paragraph, this is an at-least-once delivery semantic, not an at-least-once processing semantic. The latter is an application level concern, and accomplishing that requires delaying the offset commit until processing has completed.

The Alpakka Kafka docs have an involved discussion about at-least-once processing: in this case, at-least-once processing will likely entail introducing manual offset committing and replacing mapAsyncUnordered with mapAsync (since mapAsyncUnordered in conjunction with manual offset committing means that your application can only guarantee that a message from Kafka gets processed at-least-zero times).

In Alpakka Kafka, a broad taxonomy of message processing guarantees:

  • hard at-most-once: Consumer.atMostOnceSource - commit after every message before processing
  • soft at-most-once: enable.auto.commit = true - "soft" because the commits are actually batched for increased throughput, so this is really "at-most-once, except when it's at-least-once"
  • hard at-least-once: manual commit only after all processing has been verified to succeed
  • soft at-least-once: manual commit after some processing has been completed (i.e. "at-least-once, except when it's at-most-once")
  • exactly-once: not possible in general, but if your processing has the means to dedupe and thus make duplicates idempotent, you can have effectively-once
Levi Ramsey
  • 18,884
  • 1
  • 16
  • 30
  • Thank you very much for your detailed answer! It helped me a lot to understand how things work under the hood. – kadir Feb 02 '22 at 10:16