I am writing an integration test for my app and would like to test my exception-handling functionality. More specifically, I need my Kafka Producer's DeliveryReport
to claim that a Kafka NetworkException
has occurred -- which will trigger my app's Kafka NetworkException logic. This will allow me to see how well my application handles a Kafka NetworkException
.
Below is my application's Send()
logic -- which produces a message via Kafka, along with the DeliveryResult
method that gets passed to the producer -- this handles the Kafka response; which is what should notify me of a NetworkException if one should occur:
public void Send(
ProducerMessage<TKey, TValue> producerMessage,
string topic,
Action<McFlowProducerResult<TKey, TValue>> callback = null)
{
_producer.Produce(topic, producerMessage,
deliveryReport => HandleDeliveryReport(deliveryReport, callback));
}
private Task HandleDeliveryReportErrorAsync(DeliveryReport<string, string> deliveryReport)
{
if (deliveryReport.Error.Code == ErrorCode.NetworkException)
{
_logger.LogError("Sending message to DynamoDb");
return _fatalErrorHandler
.HandleError(new McFlowProducerResult<string, string>(deliveryReport));
}
return Task.CompletedTask;
}
private void HandleDeliveryReport(DeliveryReport<string, string> deliveryReport,
Action<McFlowProducerResult<TKey,TValue>> callback)
{
var mcflowProducerResult = new McFlowProducerResult<string, string>(deliveryReport);
if (!deliveryReport.Error.IsError)
{
_logger.LogInformation("Message Sent successfully to DLQ TOPIC");
callback?.Invoke(mcflowProducerResult as McFlowProducerResult<TKey, TValue>);
}
_logger.LogError("Unable to send the message to DLQ TOPIC: {Topic}. Error Reason :{Error}",
deliveryReport.Topic, deliveryReport.Error.Reason);
HandleDeliveryReportErrorAsync(deliveryReport).ConfigureAwait(false).GetAwaiter().GetResult();
}
Below is my integration test, which successfully produces messages into the specified topic and consumes them just fine. The problem is, I want to purposely cause a Kafka NetworkException
, so that I can see my DeliveryHandler
's HandleDeliveryReportErrorAsync
logic in action:
[Fact]
public async Task CreateNetworkException()
{
const string dlqTopicName = TOPIC_NAME + "-producer-dlq";
var serializedProducerMessageKey = _mcFlowMessageSerializer.Serialize(_producerMessage.Message.Key);
await _topicFixture.CreateTopic(dlqTopicName, 1, 1);
var testUtil = new ConsumerTestHarness(_confluentConsumerConfig);
var deliveryReportTaskCompletionSource = new TaskCompletionSource<McFlowProducerResult<string, object>>();
McFlowProducerResult<string, object> callbackResult = null;
_mcFlowDlqProducer.Send(_producerMessage, dlqTopicName,
(deliveryResult) =>
{
callbackResult = deliveryResult;
deliveryReportTaskCompletionSource.SetResult(deliveryResult);
});
await deliveryReportTaskCompletionSource.Task;
testUtil.Consume(dlqTopicName, TimeSpan.FromSeconds(5));
var results = testUtil.Results;
var expectedMessage
= results.Find(message
=> message.Message.Key.Equals(serializedProducerMessageKey));
Assert.Equal(serializedProducerMessageKey, expectedMessage.Message.Key);
}
This is the ProducerConfig
I'm using to create my Kafka Producer in my integration test:
var confluentProducerConfig = new ProducerConfig()
{
BootstrapServers = "localhost:29092/1",
Acks = Acks.All
};
How can I create the conditions for a Kafka NetworkException
to occur?