4

I have a bunch of services that are integrated via Apache Kafka, and each of the services has their consumers and producers, but im facing slowing consuming rate like there's something slowing the consuming when get so much load into the topic.

Here's an example of my kafka consumer implementation:

    public class Consumer : BackgroundService
    {
        private readonly KafkaConfiguration _kafkaConfiguration;
        private readonly ILogger<Consumer> _logger;
        private readonly IConsumer<Null, string> _consumer;
        private readonly IMediator _mediator;

        public Consumer(
            KafkaConfiguration kafkaConfiguration,
            ILogger<Consumer> logger,
            IServiceScopeFactory provider
        )
        {
            _logger = logger;
            _kafkaConfiguration = kafkaConfiguration;
            _mediator = provider.CreateScope().ServiceProvider.GetRequiredService<IMediator>();

            var consumerConfig = new ConsumerConfig
            {
                GroupId = "order-service",
                BootstrapServers = kafkaConfiguration.ConnectionString,
                SessionTimeoutMs = 6000,
                ConsumeResultFields = "none",
                QueuedMinMessages = 1000000,
                SecurityProtocol = SecurityProtocol.Plaintext,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnableAutoOffsetStore = false,
                FetchWaitMaxMs = 100,
                AutoCommitIntervalMs = 1000
            };

            _consumer = new ConsumerBuilder<Null, string>(consumerConfig).Build();
        }

        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            new Thread(() => StartConsumingAsync(stoppingToken)).Start();
            return Task.CompletedTask;
        }

        public async Task StartConsumingAsync(CancellationToken cancellationToken)
        {
            _consumer.Subscribe("orders");

            while (!cancellationToken.IsCancellationRequested)
            {
                try
                {
                    var consumedResult = _consumer.Consume(cancellationToken);

                    if (consumedResult == null) continue;

                    var messageAsEvent = JsonSerializer.Deserialize<OrderReceivedIntegrationEvent>(consumedResult.Message.Value);

                    await _mediator.Publish(messageAsEvent, CancellationToken.None);
                }
                catch (Exception e)
                {
                    _logger.LogCritical($"Error {e.Message}");
                }
            }
        }

and here's an example of my producer:

public class Producer
    {
        protected readonly IProducer<Null, string> Producer;

        protected Producer(string host)
        {
            var producerConfig = new ProducerConfig
            {
                BootstrapServers = host,
                Acks = Acks.Leader
            };

            Producer = new ProducerBuilder<Null, string>(producerConfig).Build();
        }

        public void Produce(InitialOrderCreatedIntegrationEvent message)
        {
            var messageSerialized = JsonSerializer.Serialize(message);
            Producer.Produce("orders", new Message<Null, string> {Value = messageSerialized});
        }
    }

As you can see, the consumer only reads the message from kafka topic and deserialize the message into a MediatR INotification object and then publish to the handler

the handler works with databases transactions, redis cache read/write, and push notifications

an example of my handler:

public override async Task Handle(OrderReceivedIntegrationEvent notification, CancellationToken cancellationToken)
        {
            try
            {
                // Get order from database
                var order = await _orderRepository.GetOrderByIdAsync(notification.OrderId.ToString());
                
                order.EditOrder(default, notification.Price);
                
                order.ChangeOrderStatus(notification.Status, notification.RejectReason);
                
                // commit the transaction
                if (await _uow.Commit())
                {
                    var cacheModificationRequest = _mapper.Map<CacheOrdersModificationRequestedIntegrationEvent>(order);
                    
                    // send mediatr notification to change cache information in Redis
                    await _bus.Publish(cacheModificationRequest, cancellationToken);
                }
            }
            catch (Exception e)
            {
                _logger.LogInformation($"Error {e.Message}");
            }
        }

but when i run a load test with 2000 requests with a ramp up of 15 seconds, the consumer starts to slow, something like 2 ~ 5 minutes to consume all the 2000 requests.

I was wondering if i remove the MediatR layer and start handling the process in the Consumer class it will improove the performance

or if there is some Kafka configuration that improove the throughput, something like remove the Acks of the In Sync topic replicas, or commit the offset after a bunch of time.

First i have implemented the kafka using the MassTransit library, and after there a find this slow consuming rate, i tried to change the library to the Confluet.Kafka just to remove the MassTransit abstraction layer if it will have a improovement, but still the same:

<PackageReference Include="Confluent.Kafka" Version="1.7.0" />

Anyone already faced the same problem can help me?

OBS: My Kafka are running in Cluster with 3 brokers in Kubernetes, and the topics each one has 6 partitions with 3 replication factor

Gabriel Guedes
  • 481
  • 5
  • 15

0 Answers0