2

We are using the .NET Kafka client to consume messages from one topic in a C# code. However, it seems to be a wee bit too slow.

Wondering if we could parallelize the process a bit, so I checked this answer there: Kafka how to consume one topic parallel

But I don't really see how to implement this partition thing with the .NET Kafka client in my example below:

var consumerBuilder = new ConsumerBuilder<Ignore, string>(GetConfig())
    .SetErrorHandler((_, e) => _logger.LogError("Kafka consumer error on Revenue response. {@KafkaConsumerError}", e));

using (var consumer = consumerBuilder.Build())
{
    consumer.Subscribe(RevenueResponseTopicName);

    try
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var consumeResult = consumer.Consume(stoppingToken);

            RevenueTopicResponseModel revenueResponse;
            try
            {
                revenueResponse = JsonConvert.DeserializeObject<RevenueTopicResponseModel>(consumeResult.Value);
            }
            catch
            {
                _logger.LogCritical("Impossible to deserialize the response. {@RevenueConsumeResult}", consumeResult);
                continue;
            }
            _logger.LogInformation("Revenue response received from Kafka. {RevenueTopicResponse}",
                consumeResult.Value);

            await _revenueService.RevenueResultReceivedAsync(revenueResponse);
        }
    }
    catch (OperationCanceledException)
    {
        _logger.LogInformation($"Operation canceled. Closing {nameof(RevenueResponseConsumer)}.");
        consumer.Close();
    }
    catch (Exception e)
    {
        _logger.LogCritical(e, $"Unhandled exception during {nameof(RevenueResponseConsumer)}.");
    }
}
Natalie Perret
  • 8,013
  • 12
  • 66
  • 129
  • The "partition thing" is something at "topic" level: it's on the broker that you define the number of partitions for the topic, not on the consumer side. And when the topic is "split" into N partitions, then you can parallelize the process and up to N instances (with the same group.id) of the consumer can consume these N partitions – Val Bonn Aug 12 '19 at 15:47
  • @ValBonn alright so if I have no control whatsoever on the broker, I cannot do much then? – Natalie Perret Aug 12 '19 at 15:49
  • Aren't you authorized to create new topics or update existing ones? Do you know the current number of partitions of this topic? – Val Bonn Aug 13 '19 at 06:22
  • 1
    You need to have 2 different instances of consumer with different `group.id`. Kafka maps offset based on topic name and `group.id` – OlegI Sep 27 '19 at 11:28

3 Answers3

2

You need to create topic with multiple partitions, let's say 10. In your code create 10 consumers with the same Consumer Group - brokers will distribute topic messages among your consumers.

Basically, just put your code inside for loop:

for (int i = 0; i < 10; i++)
{
    var consumerBuilder = new ConsumerBuilder<Ignore, string>(GetConfig())
    .SetErrorHandler((_, e) => _logger.LogError("Kafka consumer error on Revenue response. {@KafkaConsumerError}", e));

    using (var consumer = consumerBuilder.Build())
    {
        // your processing here
    }
}
Progmer
  • 33
  • 5
2

In order to answer to this question correctly we need to know what is the reason behind this requirement to partitioning.

If your topic doesn't have lots of messages to be processed then it's not the case to use partitioning. If the issue is that a single message processing tooks too much time and you want parallelize the work, then you could add consumed messages to a Channel and have as many consumers of that channel as needed in background.

Basically you should still use a single consumer per process since a consumer utilizes threads in background

Also you may find my consideration about Kafka Consumer in C# in the article

If you have any questions, please feel free to ask! I'll be glad to help you

K.Zakiev
  • 31
  • 1
0

You can commit after a set of offsets instead of committing on each offset, which could give you some performance benefit.

if( result.offset % 5 == 0)
{
   consumer.Commit(result)
}

Assuming EnableAutoCommit = false