3

I am using EmbeddedKafkaRule as below in a SpringBootTest class-

    private static final String TEMPLATE_TOPIC = "templateTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);


   @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
                embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<Integer, String> cf =
                new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<Integer, String> container =
                new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container,
                embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        Map<String, Object> producerProps =
                KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        ProducerFactory<Integer, String> pf =
                new DefaultKafkaProducerFactory<Integer, String>(producerProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.sendDefault(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.send(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }

When 'container.start();' statement is executed, the logs keep printing the below exception-

2020-05-22 19:12:19.307  INFO 50295 --- [    Test worker] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 10
        auto.offset.reset = earliest
        bootstrap.servers = [127.0.0.1:0]
        check.crcs = true
        client.dns.lookup = default
        client.id = 
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = testT
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 60000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

    2020-05-22 19:12:19.590  INFO 50295 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
    2020-05-22 19:12:19.590  INFO 50295 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
    2020-05-22 19:12:19.591  INFO 50295 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1590199939584
    2020-05-22 19:12:19.605  INFO 50295 --- [    Test worker] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-testT-1, groupId=testT] Subscribed to topic(s): templateTopic
    2020-05-22 19:12:19.615  INFO 50295 --- [    Test worker] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
    2020-05-22 19:12:20.157  WARN 50295 --- [mplateTests-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-testT-1, groupId=testT] Error connecting to node 127.0.0.1:0 (id: -1 rack: null)

    java.net.BindException: Can't assign requested address
        at java.base/sun.nio.ch.Net.connect0(Native Method) ~[na:na]
        at java.base/sun.nio.ch.Net.connect(Net.java:482) ~[na:na]
        at java.base/sun.nio.ch.Net.connect(Net.java:474) ~[na:na]
        at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:694) ~[na:na]
        at org.apache.kafka.common.network.Selector.doConnect(Selector.java:277) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.common.network.Selector.connect(Selector.java:255) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:957) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:293) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:495) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:236) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:463) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.0.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1089) ~[spring-kafka-2.5.0.RELEASE.jar:2.5.0.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1045) ~[spring-kafka-2.5.0.RELEASE.jar:2.5.0.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:970) ~[spring-kafka-2.5.0.RELEASE.jar:2.5.0.RELEASE]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

    2020-05-22 19:12:20.159  WARN 50295 --- [mplateTests-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-testT-1, groupId=testT] Bootstrap broker 127.0.0.1:0 (id: -1 rack: null) disconnected
    2020-05-22 19:12:20.249  WARN 50295 --- [mplateTests-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-testT-1, groupId=testT] Error connecting to node 127.0.0.1:0 (id: -1 rack: null)

    java.net.BindException: Can't assign requested address
        at java.base/sun.nio.ch.Net.connect0(Native Method) ~[na:na]
        at java.base/sun.nio.ch.Net.connect(Net.java:482) ~[na:na]

I have tried to change the default port from 0 to non-zero value, but it didnt help with the below messages -

2020-05-22 16:23:59.388  INFO 21162 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
    2020-05-22 16:23:59.388  INFO 21162 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
    2020-05-22 16:23:59.388  INFO 21162 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1590189839385
    2020-05-22 16:23:59.407  WARN 21162 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (/127.0.0.1:9090) could not be established. Broker may not be available.
    2020-05-22 16:23:59.407  WARN 21162 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker 127.0.0.1:9090 (id: -1 rack: null) disconnected
    2020-05-22 16:23:59.511  WARN 21162 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (/127.0.0.1:9090) could not be established. Broker may not be available.
    2020-05-22 16:23:59.512  WARN 21162 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker 127.0.0.1:9090 (id: -1 rack: null) disconnected

I am using org.springframework.kafka:spring-kafka-test:2.5.0.RELEASE Please let me know if you faced this issue and resolved it

Thanks

hsandhu
  • 31
  • 2

2 Answers2

0

Spring should call

embeddedKafka.afterPropertiesSet();

I can't see why in your code this is not happening, but you can manually call it to ensure the server starts.

You may want to set the ports used by the server first though:

embeddedKafka.setZkPort(freePort());
embeddedKafka.kafkaPorts(freePort());
0

I add the code below like @cgenrich mentioned before and it worked for me.

embeddedKafka.kafkaPorts(9092);

or maybe your kafka and kafka-test versions are not compatible. For example, my kafka at 2.9.1 but kafka-test 3.0.1 so I was getting this error too.

Also I share my setup

@ActiveProfiles("dev")
@DirtiesContext
class IntegrationTest {


private static String TOPIC_NAME = "test-topic";

private KafkaMessageListenerContainer<String, Object> container;

private BlockingQueue<ConsumerRecord<String, String>> consumerRecords;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC_NAME);

@BeforeAll
public void setUp() {
    consumerRecords = new LinkedBlockingQueue<>();

    ContainerProperties containerProperties = new ContainerProperties(TOPIC_NAME);
    embeddedKafka.brokerProperty("offsets.topic.replication.factor", "1");
    embeddedKafka.brokerProperty("transaction.state.log.replication.factor", "1");
    embeddedKafka.brokerProperty("transaction.state.log.min.isr", "1");
    embeddedKafka.kafkaPorts(9092);
    embeddedKafka.getEmbeddedKafka().afterPropertiesSet();
    Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(
        "dopi-student-activity-1", "false", embeddedKafka.getEmbeddedKafka());

    DefaultKafkaConsumerFactory<String, Object> consumer = new DefaultKafkaConsumerFactory<>(consumerProperties);

    container = new KafkaMessageListenerContainer<>(consumer, containerProperties);
    container.setupMessageListener((MessageListener<String, String>) record -> {
        consumerRecords.add(record);
    });
    container.start();
}

@After
public void tearDown() {
    container.stop();
}

@Test
@Transactional
public void test() throws Exception {

    kafkaTemplate.send("test-topic","test");

    ConsumerRecord<String, String> received = consumerRecords.poll(10, TimeUnit.SECONDS);


    Assertions.assertNotNull(received);

}
}