2

We are facing a scenario where our akka-stream-kaka-consumer processing rate is decreasing whenever there is a lag. When we start it without any lag in partitions, processing rate increases suddenly.

MSK cluster - 10 topics - 40 partitions each => 400 total leader partitions

To achieve high throughput and parallelism in system we implemented akka-stream-kafka consumers subscribing to each topic-partition separately resulting in 1:1 mapping between consumer and partition.

Here is consumer setup:

  1. Number of ec2 service instances - 7
  2. Each service spins up 6 consumer for each of the 10 topics resulting resulting in 60 consumers from each service instance.
  3. Total consumer = Number of instances (7) * Number of consumers on each service instance (60) = 420

So, in total we are starting 420 consumers spread across different instances. As per the RangeAssignor Partition strategy (Default one), each partition will get assigned to different consumer and 400 consumer will use 400 partitions and 20 consumers will remain unused. We have verified this allocation and looks good.

Instance Type used: c5.xlarge

MSK Config:

Apache Kafka version - 2.4.1.1

Total number of brokers - 9 ( spread across 3 AZs)

Broker Type: kafka.m5.large

Broker per Zone: 3

auto.create.topics.enable=true

default.replication.factor=3

min.insync.replicas=2

num.io.threads=8

num.network.threads=5

num.partitions=40

num.replica.fetchers=2

replica.lag.time.max.ms=30000

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

socket.send.buffer.bytes=102400

unclean.leader.election.enable=true

zookeeper.session.timeout.ms=18000

log.retention.ms=259200000

This is the configuration we are using for each consumers

akka.kafka.consumer {
 kafka-clients {
  bootstrap.servers = "localhost:9092"
  client.id = "consumer1"
  group.id = "consumer1"
  auto.offset.reset="latest"
 }
 aws.glue.registry.name="Registry1"
 aws.glue.avroRecordType = "GENERIC_RECORD"
 aws.glue.region = "region"
 

    kafka.value.deserializer.class="com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer"

 # Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
 # configured by `consumer.metadata-request-timeout`
 connection-checker {

  #Flag to turn on connection checker
  enable = true

  # Amount of attempts to be performed after a first connection failure occurs
  # Required, non-negative integer
  max-retries = 3

  # Interval for the connection check. Used as the base for exponential retry.
  check-interval = 15s

  # Check interval multiplier for backoff interval
  # Required, positive number
  backoff-factor = 2.0
 }
}

akka.kafka.committer {

 # Maximum number of messages in a single commit batch
 max-batch = 10000

 # Maximum interval between commits
 max-interval = 5s

 # Parallelism for async committing
 parallelism = 1500

 # API may change.
 # Delivery of commits to the internal actor
 # WaitForAck: Expect replies for commits, and backpressure the stream if replies do not arrive.
 # SendAndForget: Send off commits to the internal actor without expecting replies (experimental feature since 1.1)
 delivery = WaitForAck

 # API may change.
 # Controls when a `Committable` message is queued to be committed.
 # OffsetFirstObserved: When the offset of a message has been successfully produced.
 # NextOffsetObserved: When the next offset is observed.
 when = OffsetFirstObserved
}


akka.http {
 client {
  idle-timeout = 10s
 }
 host-connection-pool {
  idle-timeout = 10s
  client {
   idle-timeout = 10s
  }
 }
}

consumer.parallelism=1500

We are using below code to to materialised the flow from Kafka to empty sink

override implicit val actorSystem = ActorSystem("Consumer1")
override implicit val materializer = ActorMaterializer()
override implicit val ec = system.dispatcher
val topicsName = "Set of Topic Names"
val parallelism = conf.getInt("consumer.parallelism")


val supervisionDecider: Supervision.Decider = {
 case _ => Supervision.Resume
}

val commiter = committerSettings.getOrElse(CommitterSettings(actorSystem))
val supervisionStrategy = ActorAttributes.supervisionStrategy(supervisionDecider)
Consumer
 .committableSource(consumerSettings, Subscriptions.topics(topicsName))
 .mapAsync(parallelism) {
  msg =>
   f(msg.record.key(), msg.record.value())
    .map(_ => msg.committableOffset)
    .recoverWith {
     case _ => Future.successful(msg.committableOffset)
    }
 }
 .toMat(Committer.sink(commiter).withAttributes(supervisionStrategy))(DrainingControl.apply)
 .withAttributes(supervisionStrategy)

Library versions in code

"com.typesafe.akka" %% "akka-http"            % "10.1.11",
 "com.typesafe.akka" %% "akka-stream-kafka" % "2.0.3",
 "com.typesafe.akka" %% "akka-stream" % "2.5.30"

The observation are as follows,

  1. In successive intervals of 1 hour lets say, only some of consumers
    are actively consuming the lag and processing at the expected rate.
  2. In next 1 hours, some other consumers become active and actively
    consumes from its partitions and then stop processing.
  3. All the lag gets cleared in a single shot as observed from the offsetLag Graph.

We want all the consumers to be running in parallel and processing the messages in real time. This lag of 3 days in processing is causing a major downtime for us. I tried following the given link but we are already on the fixed version https://github.com/akka/alpakka-kafka/issues/549

Can anyone help what we are missing in terms of configuration of consumer or some other issue.

Graph of Offset Lag Per Partition Per Topic

1 Answers1

3

That lag graph seems to me to indicate that your overall system isn't capable of handling all the load, and it almost looks like only one partition at a time is actually making progress.

That phenomenon indicates to me that the processing being done in f is ultimately gating on the rate at which some queue can be cleared, and that the parallelism in the mapAsync stage is too high, effectively racing the partitions against each other. Since the Kafka consumer batches records (by default in batches of 500, assuming that the consumer's lag is more than 500 records) if tha parallelism is higher than that, all of those records enter the queue at basically the same time as a block. It looks like the parallelism in the mapAsync is 1500; given the apparent use of the Kafka default 500 batch size, this seems way too high: there's no reason for it to be greater than the Kafka batch size, and if you want a more even consumption rate between partitions, it should be a lot less than that batch size.

Without details on what happens in f, it's hard to say what that queue is and how much parallelism should be reduced. But there are some general guidelines I can share:

  • If the work is CPU-bound (a sign of this would be very high CPU utilization on your consumers), you have 7 consumers with 4 vCPUs apiece. You cannot physically process more than 28 (7 x 4) records at a time, so parallelism in the mapAsync shouldn't exceed 1; alternatively you need more and/or bigger instances
  • If the work is I/O-bound or otherwise blocking, I would be careful about which threadpool/execution context/Akka dispatcher the work is being done on. All of those will typically only spawn a bounded number of threads and maintain a work queue when all threads are busy; that work queue could very well be the queue of interest. Expanding the number of threads in that pool (or if using the default execution context or default Akka dispatcher, moving that workload to an appropriately sized pool) will decrease the pressure on the queue
  • Since you're including akka-http, it's possible that the processing of messages in f involves sending an HTTP request to some other service. In that case, it's important to remember that Akka HTTP maintains a queue per targeted host; it's also likely that there's a queue on the target side which governs throughput there. This is somewhat a special case of the second (I/O bound) situation.

The I/O bound/blocking situation will be evidenced by very low CPU utilization on your instances. If you're filling the queue per targeted host, you'll see log messages about "Exceeded configured max-open-requests value".

Another thing worth noting is that because the Kafka consumer is inherently blocking, the Alpakka Kafka consumer actors run in their own dispatcher, whose size is by default 16, meaning that per host, only at most 16 consumers or producers can be working at a time. Setting akka.kafka.default-dispatcher.thread-pool-executor.fixed-pool-size to at least the number of consumers your app starts up (42 in your 6 consumers each per 7 topic configuration) is probably a good idea. Thread starvation in the Alpakka Kafka dispatcher can cause consumer rebalances which will disrupt consumption.

Without making any other changes, I would suggest, for a more even consumption rate across partitions, setting

akka.kafka.default-dispatcher.thread-pool-executor.fixed-pool-size = 42
consumer.parallelism = 50
Levi Ramsey
  • 18,884
  • 1
  • 16
  • 30
  • my cpu utilisation is quite low and it seems like you said I/O bound blocking situation. I will try to make the changes you suggested but how you came up with consumer.parallelism=50? – Abhinandan Sanduja Oct 29 '21 at 04:05
  • it's about a tenth of the Kafka batch size, so it's approximate. Anything close to or greater than 500 will lead to pretty consistent unfairness in which consumers can actually process messages. – Levi Ramsey Oct 29 '21 at 09:17
  • i have tried the given config, but still looking the same. One thing i noticed and not able to wrap my head around is that all the consumers belonging to same topic gets picked at the same time and lag is cleared and then another given topic consumer gets picked up. Dispatcher scheduling shouldn't give us this consumer type dependent allocation strategy. Could it be because of Kafka? – Abhinandan Sanduja Oct 29 '21 at 10:10
  • Kafka will generally be slower to give a consumer messages if the consumer is far behind, but that shouldn't make that noticeable of a difference. Depending on what the rate of incoming messages is, the graph you posted is somewhat consistent with the processing rate remaining fairly constant. Give `consumer.parallelism = 1` a try. – Levi Ramsey Oct 29 '21 at 11:18
  • @LeviRamsey retention period is 3 days and graph suggests that lag is getting cleaned up every 3 days. It might be happening that consumption rate is too less than production rate and partition log files are getting cleared of after every 3 days. Meanwhile, lag just continues to increase. Is it a feasibility? However, I am not sure of the way clearance will be done by Kafka whether we should see the cliff everyday indicating logs clearance for days > 3 OR cliff will be seen on every 3rd day. – A.G. Nov 12 '21 at 03:07
  • With retention I wouldn't expect the sharp lag buildup (indicative of continuous production) and then lag going to zero pattern (going to zero quickly implies almost no new messages coming into that partition for at least the retention period). – Levi Ramsey Nov 12 '21 at 12:56