0

I am writing an integration test for my app and would like to test my exception-handling functionality. More specifically, I need my Kafka Producer's DeliveryReport to claim that a Kafka NetworkException has occurred -- which will trigger my app's Kafka NetworkException logic. This will allow me to see how well my application handles a Kafka NetworkException.

Below is my application's Send() logic -- which produces a message via Kafka, along with the DeliveryResult method that gets passed to the producer -- this handles the Kafka response; which is what should notify me of a NetworkException if one should occur:

        public void Send(
            ProducerMessage<TKey, TValue> producerMessage, 
            string topic, 
            Action<McFlowProducerResult<TKey, TValue>> callback = null)
        {
            
            _producer.Produce(topic, producerMessage,  
                    deliveryReport => HandleDeliveryReport(deliveryReport, callback));
            
        }


        private Task HandleDeliveryReportErrorAsync(DeliveryReport<string, string> deliveryReport)
        {
            if (deliveryReport.Error.Code == ErrorCode.NetworkException)
            {
                _logger.LogError("Sending message to DynamoDb");
                return _fatalErrorHandler
                    .HandleError(new McFlowProducerResult<string, string>(deliveryReport));
            }

            return Task.CompletedTask;
        }
        
        private void HandleDeliveryReport(DeliveryReport<string, string> deliveryReport,
            Action<McFlowProducerResult<TKey,TValue>> callback)
        {
            var mcflowProducerResult = new McFlowProducerResult<string, string>(deliveryReport);
            
            if (!deliveryReport.Error.IsError)
            {
                _logger.LogInformation("Message Sent successfully  to DLQ TOPIC");
                callback?.Invoke(mcflowProducerResult as McFlowProducerResult<TKey, TValue>);
            }
            
            _logger.LogError("Unable to send the message to DLQ TOPIC: {Topic}. Error Reason :{Error}", 
                deliveryReport.Topic, deliveryReport.Error.Reason);
            
            HandleDeliveryReportErrorAsync(deliveryReport).ConfigureAwait(false).GetAwaiter().GetResult();
        }

Below is my integration test, which successfully produces messages into the specified topic and consumes them just fine. The problem is, I want to purposely cause a Kafka NetworkException, so that I can see my DeliveryHandler's HandleDeliveryReportErrorAsync logic in action:

        [Fact]
        public async Task CreateNetworkException()
        {
            const string dlqTopicName = TOPIC_NAME + "-producer-dlq";
            var serializedProducerMessageKey = _mcFlowMessageSerializer.Serialize(_producerMessage.Message.Key);
            
            await _topicFixture.CreateTopic(dlqTopicName, 1, 1);
            
            var testUtil = new ConsumerTestHarness(_confluentConsumerConfig);
            
            var deliveryReportTaskCompletionSource = new TaskCompletionSource<McFlowProducerResult<string, object>>();
            McFlowProducerResult<string, object> callbackResult = null;

            _mcFlowDlqProducer.Send(_producerMessage, dlqTopicName,
                (deliveryResult) =>
                {
                    callbackResult = deliveryResult;
                    deliveryReportTaskCompletionSource.SetResult(deliveryResult);
                });
            
            await deliveryReportTaskCompletionSource.Task;
            
            testUtil.Consume(dlqTopicName, TimeSpan.FromSeconds(5));
            var results = testUtil.Results;
            
            var expectedMessage 
                = results.Find(message 
                    => message.Message.Key.Equals(serializedProducerMessageKey));
            
            Assert.Equal(serializedProducerMessageKey, expectedMessage.Message.Key);
        }

This is the ProducerConfig I'm using to create my Kafka Producer in my integration test:

        var confluentProducerConfig = new ProducerConfig()
        {
            BootstrapServers = "localhost:29092/1",
            Acks = Acks.All    
        };

How can I create the conditions for a Kafka NetworkException to occur?

Vismark Juarez
  • 613
  • 4
  • 14
  • 21
  • You could try a MonkeyPolicy Injection of a SocketException or a [NetworkException](https://learn.microsoft.com/en-us/dotnet/api/microsoft.informationprotection.exceptions.networkexception?view=mipsdk-dotnet-1.12), there is some debate about this and YMMV: https://stackoverflow.com/a/73133324/495455 – Jeremy Thompson Dec 14 '22 at 00:01

0 Answers0