I have a @KafkaListener
consumer and want to write integration test.
The fact is that it seems to be difficult to find the exact moment when method Consumer#consume
finished its execution to perform some asserts after message was processed and some state in database has changed.
@Component
public class Consumer {
private final Service service;
@KafkaListener(id = "id", groupId = "group", topics = "topic", containerFactory = "factory")
public void consume(@Payload Message message, Acknowledgment acknowledgment) {
service.process(message);
acknowledgment.acknowledge();
}
}
Test
@SpringBootTest
@EmbeddedKafka
void class Testing {
// some useful beans
@SpyBean
private Consumer consumer;
@Test
void shoudConsume() throws Exception {
Message message = new Message();
String topic = "topic";
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
new KafkaProducer<String, String>(senderProps).send(new ProducerRecord<>(topic, message))
.get(1L, TimeUnit.SECONDS);
Mockito.verify(consumer, Mockito.timeout(1_000L)).consume(any(Message.class), any(Acknowledgment.class));
// perform some asserts
}
The fact is that if I put Thread.sleep(1000L)
the consumer processes message and all works fine but with Mockito it doesn't work, because all asserts executes before consumer finished its execution of method Consumer#consume
.
Is there is a opportunity (using Listeners, or etc) to catch the moment when @KafkaListener
-consumer acknowledged/finished message processing to perform asserts with appropriate database state? The integration test is needed to be sure that end-to-end functionality works fine.
Also I tried to make #verify
checks on @SpyBean private Service service
, method Service#process
, but it doesn't work too.