10

We have an integration test where we use EmbeddedKafka and produce a message to a topic, our app processes that message, and the result is sent to a second topic where we consume and assert the output. In CI this works maybe 2/3 of the time, but we will hit cases where KafkaTestUtils.getSingleRecord throws java.lang.IllegalStateException: No records found for topic (See [1] below).

To try and resolve this, I added ContainerTestUtils.waitForAssignment for each listener container in the registry (See [2] below). After a few successful runs in CI, I saw a new exception: java.lang.IllegalStateException: Expected 1 but got 0 partitions. This now has me wondering if this was actually the root cause of the original exception of no records found.

Any ideas what could help with the random failures here? I would appreciate any suggestions on how to troubleshoot.

spring-kafka and spring-kafka-test v2.6.4.

Edit: Added newConsumer for reference.

Example of our setup:

@SpringBootTest
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(
    topics = { "topic1","topic2" },
    partitions = 1,
    brokerProperties = {"listeners=PLAINTEXT://localhost:9099", "port=9099"})
public class IntegrationTest {

  @Autowired
  private EmbeddedKafkaBroker embeddedKafkaBroker;

  @Autowired
  private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

  @Test
  public void testExample() {
    try (Consumer<String, String> consumer = newConsumer()) {
      for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
        [2]
        ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
      }

      try (Producer<String, String> producer = newProducer()) {
        embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "topic2"); // [1]

        producer.send(new ProducerRecord<>(
            "topic1",
            "test payload"));
        producer.flush();
      }

      String result = KafkaTestUtils.getSingleRecord(consumer, "topic2").value();
      assertEquals(result, "expected result");
    }
  }

  private Consumer<String, String> newConsumer() {
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("groupId", "false", embeddedKafkaBroker);
    ConsumerFactory<String, AssetTransferResponse> consumerFactory = new DefaultKafkaConsumerFactory<>(
        consumerProps,
        new StringDeserializer(),
        new CustomDeserializer<>());
    return consumerFactory.createConsumer();
  }
}
archonic
  • 375
  • 4
  • 15
  • You need to show `newConsumer()` - for the first problem, have you set `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")`? Why are you waiting for a container to get an assignment when you plan to receive the record from your `Consumer`, not the container's consumer. – Gary Russell Mar 18 '21 at 21:37
  • @GaryRussell Added `newConsumer` new consumer for reference. We didn't set "earliest" here as we're trying to mimic our apps behavior, but we might end up just doing that. As far as waiting for the assignment, honestly was just looking at other threads as means of trying to programmatically stall for things to be "ready". – archonic Mar 18 '21 at 22:00
  • That generally is the cause of these random test failures because the record can be stored before the consumer starts and by default the starting offset is set to “latest”. – Gary Russell Mar 18 '21 at 22:08
  • @GaryRussell I saw this prescribed in other posts. Is there any other way to reliably wait for a consumer to be ready before producing? – archonic Mar 18 '21 at 22:09
  • Hmmm - that's weird. I just noticed that you have `embeddedKafkaBroker.consumeFromAnEmbeddedTopic()` - that will wait until the partitions are assigned (or throw an exception if it doesn't happen), avoiding the race condition, so the offset reset setting shouldn't matter. I am not sure what's going on now. But certainly, waiting for the container assignment is wrong. – Gary Russell Mar 18 '21 at 22:21
  • 1
    Dude if you digged into this topic since a year after this question, could you please help me to find the root cause of this strange behaviour. Local tests are OK but when I ran them through maven in TeamCity, it fails just like yours – Alexander.Iljushkin Aug 19 '22 at 21:34

0 Answers0