In integration testing, having fixed ports like 9092 is not recommended because multiple tests should have the flexibility to open their own ports from embedded instances. So, following implementation is something like that,
NB: this implementation is based on junit5(Jupiter:5.7.0) and spring-boot 2.3.4.RELEASE
TestClass:
@EnableKafka
@SpringBootTest(classes = {ConsumerTest.Config.class, Consumer.class})
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ConsumerTest {
@Autowired
private EmbeddedKafkaBroker kafkaEmbedded;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@BeforeAll
public void setUp() throws Exception {
for (final MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbedded.getPartitionsPerTopic());
}
}
@Value("${topic.name}")
private String topicName;
@Autowired
private KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate;
@Test
public void consume_success() {
requestKafkaTemplate.send(topicName, load);
}
@Configuration
@Import({
KafkaListenerConfig.class,
TopicConfig.class
})
public static class Config {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestProducerFactory() {
final Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate() {
return new KafkaTemplate<>(requestProducerFactory());
}
}
}
Listener Class:
@Component
public class Consumer {
@KafkaListener(
topics = "${topic.name}",
containerFactory = "listenerContainerFactory"
)
@Override
public void listener(
final ConsumerRecord<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> consumerRecord,
final @Payload Optional<Map<String, List<ImmutablePair<String, String>>>> payload
) {
}
}
Listner Config:
@Configuration
public class KafkaListenerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${topic.name}")
private String resolvedTreeQueueName;
@Bean
public ConsumerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeConsumerFactory() {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, resolvedTreeQueueName);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new CustomDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(resolvedTreeConsumerFactory());
return factory;
}
}
TopicConfig:
@Configuration
public class TopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${topic.name}")
private String requestQueue;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic requestTopic() {
return new NewTopic(requestQueue, 1, (short) 1);
}
}
application.properties:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
This assignment is the most important assignment that would bind the embedded instance port to the KafkaTemplate and, KafkaListners.
Following the above implementation, you could open dynamic ports per test class and, it would be more convenient.