We are facing a scenario where our akka-stream-kaka-consumer processing rate is decreasing whenever there is a lag. When we start it without any lag in partitions, processing rate increases suddenly.
MSK cluster - 10 topics - 40 partitions each => 400 total leader partitions
To achieve high throughput and parallelism in system we implemented akka-stream-kafka consumers subscribing to each topic-partition separately resulting in 1:1 mapping between consumer and partition.
Here is consumer setup:
- Number of ec2 service instances - 7
- Each service spins up 6 consumer for each of the 10 topics resulting resulting in 60 consumers from each service instance.
- Total consumer = Number of instances (7) * Number of consumers on each service instance (60) = 420
So, in total we are starting 420 consumers spread across different instances. As per the RangeAssignor Partition strategy (Default one), each partition will get assigned to different consumer and 400 consumer will use 400 partitions and 20 consumers will remain unused. We have verified this allocation and looks good.
Instance Type used: c5.xlarge
MSK Config:
Apache Kafka version - 2.4.1.1
Total number of brokers - 9 ( spread across 3 AZs)
Broker Type: kafka.m5.large
Broker per Zone: 3
auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=40
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
log.retention.ms=259200000
This is the configuration we are using for each consumers
akka.kafka.consumer {
kafka-clients {
bootstrap.servers = "localhost:9092"
client.id = "consumer1"
group.id = "consumer1"
auto.offset.reset="latest"
}
aws.glue.registry.name="Registry1"
aws.glue.avroRecordType = "GENERIC_RECORD"
aws.glue.region = "region"
kafka.value.deserializer.class="com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer"
# Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
# configured by `consumer.metadata-request-timeout`
connection-checker {
#Flag to turn on connection checker
enable = true
# Amount of attempts to be performed after a first connection failure occurs
# Required, non-negative integer
max-retries = 3
# Interval for the connection check. Used as the base for exponential retry.
check-interval = 15s
# Check interval multiplier for backoff interval
# Required, positive number
backoff-factor = 2.0
}
}
akka.kafka.committer {
# Maximum number of messages in a single commit batch
max-batch = 10000
# Maximum interval between commits
max-interval = 5s
# Parallelism for async committing
parallelism = 1500
# API may change.
# Delivery of commits to the internal actor
# WaitForAck: Expect replies for commits, and backpressure the stream if replies do not arrive.
# SendAndForget: Send off commits to the internal actor without expecting replies (experimental feature since 1.1)
delivery = WaitForAck
# API may change.
# Controls when a `Committable` message is queued to be committed.
# OffsetFirstObserved: When the offset of a message has been successfully produced.
# NextOffsetObserved: When the next offset is observed.
when = OffsetFirstObserved
}
akka.http {
client {
idle-timeout = 10s
}
host-connection-pool {
idle-timeout = 10s
client {
idle-timeout = 10s
}
}
}
consumer.parallelism=1500
We are using below code to to materialised the flow from Kafka to empty sink
override implicit val actorSystem = ActorSystem("Consumer1")
override implicit val materializer = ActorMaterializer()
override implicit val ec = system.dispatcher
val topicsName = "Set of Topic Names"
val parallelism = conf.getInt("consumer.parallelism")
val supervisionDecider: Supervision.Decider = {
case _ => Supervision.Resume
}
val commiter = committerSettings.getOrElse(CommitterSettings(actorSystem))
val supervisionStrategy = ActorAttributes.supervisionStrategy(supervisionDecider)
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topicsName))
.mapAsync(parallelism) {
msg =>
f(msg.record.key(), msg.record.value())
.map(_ => msg.committableOffset)
.recoverWith {
case _ => Future.successful(msg.committableOffset)
}
}
.toMat(Committer.sink(commiter).withAttributes(supervisionStrategy))(DrainingControl.apply)
.withAttributes(supervisionStrategy)
Library versions in code
"com.typesafe.akka" %% "akka-http" % "10.1.11",
"com.typesafe.akka" %% "akka-stream-kafka" % "2.0.3",
"com.typesafe.akka" %% "akka-stream" % "2.5.30"
The observation are as follows,
- In successive intervals of 1 hour lets say, only some of consumers
are actively consuming the lag and processing at the expected rate. - In next 1 hours, some other consumers become active and actively
consumes from its partitions and then stop processing. - All the lag gets cleared in a single shot as observed from the offsetLag Graph.
We want all the consumers to be running in parallel and processing the messages in real time. This lag of 3 days in processing is causing a major downtime for us. I tried following the given link but we are already on the fixed version https://github.com/akka/alpakka-kafka/issues/549
Can anyone help what we are missing in terms of configuration of consumer or some other issue.