0

` @RunWith(SpringRunner.class) @SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, topics = {EmbeddedKafkaIntegrationTest.TEST_TOPIC},bootstrapServersProperty = "spring.kafka.bootstrap-servers") @ActiveProfiles("test") public class EmbeddedKafkaIntegrationTest {

static final String TEST_TOPIC = "MERCHANT-SERVICE-BILLABLE-EVENTS-DPG-IN";
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String offsetReset;
@Autowired
private MessagePublisher producer;
@Autowired
private KafkaConsumer consumer;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Captor
ArgumentCaptor<ConsumerRecord<String, BillableEventsRequest>> billableEventsRequestArgumentCaptor;
@Captor
ArgumentCaptor<String> topicArgumentCaptor;
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}`

 @Test
public void embeddedKafka_whenSendingToProducer_thenMessageReceived() throws IOException, ParseException {
    //Producer
    BillableEventsRequest event = createTestRequest();
    producer.publish(event);
    //consumer
    verify(consumer, timeout(1000).times(1)).consume(billableEventsRequestArgumentCaptor.capture());
    ConsumerRecord<String, BillableEventsRequest> payload = billableEventsRequestArgumentCaptor.getValue();
    assertNotNull(payload);
    assertTrue(TEST_TOPIC.contains(topicArgumentCaptor.getValue()));
    testEvents(payload,event);
}

}

Spring Boot : v2.3.5.RELEASE
Kafka version: 2.5.1
Apache Camel 3.9.0
Java : v11.0.15
zookeeper.version=3.5.9-83df9301aa5c2a5d284a9940177808c01bc3 5cef, built on 01/06/2021 20:03 GMT

When I'm starting my embedded kafka test locally then its giving me below,

Error Logs :

when IP address:9092 is given for spring.kafka.producer.bootstrap-servers

[Consumer clientId=consumer-sbilling-billable-events-dpg-in-consumer-1, groupId=sbilling-billable-events-dpg-in-consumer] Connection to node -1 (/10.165.101.110:9092) could not be established. Broker may not be available.
2022-10-17 17:49:50.643  WARN 1820 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-sbilling-billable-events-dpg-in-consumer-1, groupId=sbilling-billable-events-dpg-in-consumer] Bootstrap broker 10.165.101.110:9092 (id: -1 rack: null) disconnected


2022-10-17 17:49:52.006  WARN 1820 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (INPNQLT896845.in.db.com/10.165.101.110:9092) could not be established. Broker may not be available.
2022-10-17 17:49:52.006  WARN 1820 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker 10.165.101.110:9092 (id: -1 rack: null) disconnected

when localhost:9092 is given for spring.kafka.producer.bootstrap-servers

2022-10-17 17:19:26.743  WARN 27272 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-sbilling-billable-events-dpg-in-consumer-1, groupId=sbilling-billable-events-dpg-in-consumer] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-10-17 17:19:26.743  WARN 27272 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-sbilling-billable-events-dpg-in-consumer-1, groupId=sbilling-billable-events-dpg-in-consumer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

2022-10-17 17:19:27.687  WARN 27272 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-10-17 17:19:27.688  WARN 27272 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

At the end of the logs I'm getting like this,

[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 73.128 s <<< FAILURE! - in com.db.spricing.ms.broker.EmbeddedKafkaIntegrationTest
[ERROR] embeddedKafka_whenSendingToProducer_thenMessageReceived  Time elapsed: 60.24 s  <<< ERROR!
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic test.in.topic not present in metadata after 60000 ms.
    at com.db.spricing.ms.broker.EmbeddedKafkaIntegrationTest.embeddedKafka_whenSendingToProducer_thenMessageReceived(EmbeddedKafkaIntegrationTest.java:89)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic test.in.topic not present in metadata after 60000 ms.
Trups
  • 55
  • 6
  • 1
    Post your test class code to increase chance of getting answers that might help. However, the issue might be duplicate to this: https://stackoverflow.com/questions/63714401/org-apache-kafka-common-errors-timeoutexception-topic-not-present-in-metadata-a – Mo_- Oct 17 '22 at 13:21
  • Do not use localhost, or a hard-coded address. The embedded broker has its own method to get its address – OneCricketeer Oct 17 '22 at 14:01

1 Answers1

0

It is hard to help you since that Spring Boot version is out of support for a while: https://spring.io/projects/spring-boot#support.

See if this option helps you anyway: https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.embedded.

spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

I'm not sure why you set that spring.kafka.producer.bootstrap-servers if you need to reply on whatever embedded Kafka exposes for you.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • @OneCricketeer - As per your suggestion, I change the hardcoding of address which I was using earlier to "{spring.kafka.bootstrap-servers}" in my yaml file. – Trups Oct 18 '22 at 10:48
  • @Mo_- :: Added code so that you will get better understanding of it. – Trups Oct 18 '22 at 11:10
  • Hi Artem Bilan : Currently getting below error, Action: Correct the classpath of your application so that it contains compatible versions of the classes org.springframework.kafka.test.EmbeddedKafkaBroker and kafka.utils.TestUtils – Trups Oct 18 '22 at 11:19
  • So, why the question? Just do that ! We respective doc: https://docs.spring.io/spring-kafka/docs/current/reference/html/#update-deps. Although it is better to rely on whatever version Spring Boot brings for us. But again: your one is too old ti be able to help you properly – Artem Bilan Oct 18 '22 at 11:29