We have an integration test where we use EmbeddedKafka and produce a message to a topic, our app processes that message, and the result is sent to a second topic where we consume and assert the output. In CI this works maybe 2/3 of the time, but we will hit cases where KafkaTestUtils.getSingleRecord
throws java.lang.IllegalStateException: No records found for topic
(See [1] below).
To try and resolve this, I added ContainerTestUtils.waitForAssignment
for each listener container in the registry (See [2] below). After a few successful runs in CI, I saw a new exception: java.lang.IllegalStateException: Expected 1 but got 0 partitions
. This now has me wondering if this was actually the root cause of the original exception of no records found.
Any ideas what could help with the random failures here? I would appreciate any suggestions on how to troubleshoot.
spring-kafka and spring-kafka-test v2.6.4.
Edit: Added newConsumer
for reference.
Example of our setup:
@SpringBootTest
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(
topics = { "topic1","topic2" },
partitions = 1,
brokerProperties = {"listeners=PLAINTEXT://localhost:9099", "port=9099"})
public class IntegrationTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Test
public void testExample() {
try (Consumer<String, String> consumer = newConsumer()) {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
[2]
ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
}
try (Producer<String, String> producer = newProducer()) {
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "topic2"); // [1]
producer.send(new ProducerRecord<>(
"topic1",
"test payload"));
producer.flush();
}
String result = KafkaTestUtils.getSingleRecord(consumer, "topic2").value();
assertEquals(result, "expected result");
}
}
private Consumer<String, String> newConsumer() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("groupId", "false", embeddedKafkaBroker);
ConsumerFactory<String, AssetTransferResponse> consumerFactory = new DefaultKafkaConsumerFactory<>(
consumerProps,
new StringDeserializer(),
new CustomDeserializer<>());
return consumerFactory.createConsumer();
}
}