1

I have am trying to write Integration test for Kafka Messaging in my application. When I run my tests sometimes they pass and sometimes they fail. When they fail, below line in test class fails to return any message. I am not sure why it returns message sometimes and sometimes it does not. Any help in resolving is greatly appreciated. All the configurations are in yml file.

Map<String, MessageResponse> message = messageResponseTestListener.getMessages(); 

Producer

@KafkaClient
public interface MessageRequestTestProducer {

     @Topic("${kafka.messageRequestTopic}")
     Single<RecordMetadata> sendMessageRequest(@KafkaKey String id, MessageRequest messageRequest);
} 

Consumer

@Singleton
@KafkaListener(groupId = "test", offsetReset = OffsetReset.EARLIEST)
public class MessageResponseTestListener {

    public CountDownLatch getLatch() {
        return latch;
    }

    private CountDownLatch latch = new CountDownLatch(1);

    private Map<String, MessageResponse> messages = new HashMap<>();

    @Topic("${kafka.messageResponseTopic}")
    public Single<MessageResponse> eventOccurred(@KafkaKey String key, Single<MessageResponse> recordSingle) {
        return recordSingle.doOnSuccess(record -> {
                    messages.put(key, record);
                    latch.countDown();
                }
        );

    }

    public Map<String, MessageResponse> getMessages() {
        return messages;
    }
}

KafkaBaseTest

public class KafkaTestBase implements TestPropertyProvider{

protected static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

@Override
public Map<String, String> get()
{
    return null;
}

@NonNull
@Override
public Map<String, String> getProperties()
{
    Map<String, String> propertyOverrideMap = new HashMap();
    propertyOverrideMap.put("kafka.bootstrap.servers", kafka.getBootstrapServers());

    return propertyOverrideMap;
}

static {
    kafka.start();
}
}

Test Class

@Inject
MessageRequestTestProducer messageRequestTestProducer;

@Inject
MessageResponseTestListener messageResponseTestListener;

@Test
void test() throws InterruptedException {

    TEST("Message Testing Kafka version");

    String id = "123";
    MessageRequest messageRequest = buildCorrectRequest();
    messageRequestTestProducer.sendMessageRequest(id, messageRequest).blockingGet();

    messageResponseTestListener.getLatch().await(2, TimeUnit.SECONDS);
    WHEN("Message event received");

    Map<String, MessageResponse> message = messageResponseTestListener.getMessages();

    THEN("Message is produced");

    assertTrue(message.containsKey(id));
    assertEquals(buildExpectedMessageResponse(), message.get(cdrId));

}
Abhinav Mehrotra
  • 543
  • 2
  • 8
  • 17
  • 1
    It's going to be difficult to debug this, as a lot will depend on the producer and consumer configs and other actors on your Kafka environment. Seemingly you're purging your topic in between runs (otherwise `OffsetReset.EARLIEST` would rewind and find messages from previous runs). I would however look at settings such as the [producer linger](https://stackoverflow.com/questions/51521737) config - perhaps 2 seconds isn't sufficient for your test consumer. – StuartLC May 18 '21 at 15:20
  • If you're running your tests against an actual Kafka cluster, there's high probability your tests will fail occasionally. Start an embedded broker like the Kafka project itself does for its own tests – OneCricketeer May 19 '21 at 00:02
  • @OneCricketeer - Sorry I am new to Kafka. Could you please explain what you mean by "starting an embedded broker like Kafka project"? – Abhinav Mehrotra May 19 '21 at 08:26
  • Embedded == "in memory". Most popular is probably spring one https://www.baeldung.com/spring-boot-Kafka-testing Or look at micronaut itself https://github.com/micronaut-projects/micronaut-kafka/blob/master/kafka/src/test/groovy/io/micronaut/configuration/kafka/AbstractEmbeddedServerSpec.groovy#L11 (uses https://www.testcontainers.org/modules/kafka/) – OneCricketeer May 19 '21 at 11:45
  • @OneCricketeer I am using the embedded one.. I have updated the question itself with implementation of KafkaTestBase class. Is that what you meant? I still am getting the same issue. – Abhinav Mehrotra May 19 '21 at 12:18
  • @StuartLC I have also tried to increase the time from 2 sec to 5 & 10 sec respectively but still am unable to get the response on consumer. I do not fully understand the producer linger link you have provided as I am no where setting any batch sizes. – Abhinav Mehrotra May 19 '21 at 12:31
  • I'm not sure what `implements TestPropertyProvider` gives you, but okay, so you are using Testcontainers... Maybe your "await 2 seconds" call is too short for the tests? Hard to really say – OneCricketeer May 19 '21 at 12:32
  • The producer itself, by default, waits (or lingers) for additional information to be produced. Kafka doesn't immediately send every record to the broker – OneCricketeer May 19 '21 at 12:33
  • @OneCricketeer So, then for how long should we wait for the producer to send the record and consumer to consume it? – Abhinav Mehrotra May 19 '21 at 13:29
  • I'm not sure. Your `.blockingGet()` should handle that on its own – OneCricketeer May 19 '21 at 13:48
  • Worth pointing out that using a Hashmap is probably a anti-pattern anyway. Use a KTable instead and `groupByKey` from a KStream – OneCricketeer May 19 '21 at 13:50
  • What do you mean by " '''.blockingGet()''' should handle that on its own"? – Abhinav Mehrotra May 19 '21 at 13:59

0 Answers0