We are using the .NET Kafka client to consume messages from one topic in a C# code. However, it seems to be a wee bit too slow.
Wondering if we could parallelize the process a bit, so I checked this answer there: Kafka how to consume one topic parallel
But I don't really see how to implement this partition thing with the .NET Kafka client in my example below:
var consumerBuilder = new ConsumerBuilder<Ignore, string>(GetConfig())
.SetErrorHandler((_, e) => _logger.LogError("Kafka consumer error on Revenue response. {@KafkaConsumerError}", e));
using (var consumer = consumerBuilder.Build())
{
consumer.Subscribe(RevenueResponseTopicName);
try
{
while (!stoppingToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(stoppingToken);
RevenueTopicResponseModel revenueResponse;
try
{
revenueResponse = JsonConvert.DeserializeObject<RevenueTopicResponseModel>(consumeResult.Value);
}
catch
{
_logger.LogCritical("Impossible to deserialize the response. {@RevenueConsumeResult}", consumeResult);
continue;
}
_logger.LogInformation("Revenue response received from Kafka. {RevenueTopicResponse}",
consumeResult.Value);
await _revenueService.RevenueResultReceivedAsync(revenueResponse);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation($"Operation canceled. Closing {nameof(RevenueResponseConsumer)}.");
consumer.Close();
}
catch (Exception e)
{
_logger.LogCritical(e, $"Unhandled exception during {nameof(RevenueResponseConsumer)}.");
}
}