0

I am using Spring for dependency injection and I have a bean that happens to be a Kafka producer service and it gets it's configuration like zookeeper server, etc. through properties file.

import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Service
public class KafkaProducerService implements InitializingBean {

    @Autowired
    private Properties properties;

    private KafkaProducer<String, String> producer;
    private ZkUtils zkUtils;

    public KafkaProducerService() {
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Properties kafkaProducerProperties = new Properties();
        kafkaProducerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("kafka.bootstrap.servers"));
        kafkaProducerProperties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, properties.getProperty("kafka.producer.timeout", "3000"));
        kafkaProducerProperties.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, properties.getProperty("kafka.reconnect.backoff.ms", "1000"));
        kafkaProducerProperties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, properties.getProperty("kafka.producer.timeout", "3000"));
        kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
        String zookeeperEndpoint = properties.get("zookeeper.connect") + ":2181";
        this.producer = new KafkaProducer<>(kafkaProducerProperties);
        final ZkClient zkClient = new ZkClient(zookeeperEndpoint, 10000, 10000, ZKStringSerializer$.MODULE$);
        zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperEndpoint), false);
    }

    public boolean publishMessage(final String message, final String topic) {
        try {
            producer.send(new ProducerRecord<>(topic, message))
                    .get(3, TimeUnit.SECONDS);
            return true;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            return false;
        }
    }

    public void tearDown() {
        this.producer.close();
    }
}

I am using this service by autowiring it into other services and that works well when running the application and working with it. I have additional spring context for tests that loads an embedded Kafka.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

    <context:annotation-config/>
    <context:component-scan base-package="my.project.main"/>

    <context:property-placeholder location="classpath*:my_properties.properties"/>

    <bean id="embeddedKafka" class="my.project.main.EmbeddedKafka"
          init-method="setupEmbeddedKafkaWithZookeeper"
          destroy-method="tearDown"/>

    <bean id="properties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="zookeeper.connect">localhost</prop>
                <prop key="kafka.bootstrap.servers">localhost:9092</prop>
                <prop key="acks">1</prop>
                <prop key="kafka.producer.timeout">5000</prop>
                <prop key="kafka.reconnect.backoff.ms">30000</prop>
            </props>
        </property>
    </bean>
</beans>

Whenever I run the tests the kafka sevrice producer should be using the in memory kafka that is loaded through the EmbeddedKafka bean. The problem is that in most cases the embedded kafka takes too much time to start so the kafka producer times out and fails to be instantiated by Spring. Is there any mechanism to make the KafkaProducerService bean to "wait" until the EmbeddedKafka bean is instantiated?

mishless
  • 159
  • 10
  • Sure it can. Check [Spring Test](https://docs.spring.io/spring-batch/trunk/reference/html/testing.html) – Victor Gubin Apr 18 '18 at 15:51
  • I read through the resource you pointed but I couldn't see anything about dependencies between beans. Can you elaborate on your answer? – mishless Apr 18 '18 at 17:31
  • 1
    Ok, to simplify. For testing propose you can provide a test specific spring configuration, and replace a normal bean with the test only bean. Check the https://stackoverflow.com/questions/28605833/overriding-an-autowired-bean-in-unit-tests – Victor Gubin Apr 18 '18 at 19:48

1 Answers1

1

There are ways to make producer wait. But I guess you don't want to wait forever. Any wait will have some timeout (let's say X seconds) just in case if embedded kafka config is incorrect or some other problem happens so that tests did not hang forever.

You can just set timeout for producer to X seconds and that will do the trick.

If you do want to wait forever bear with me.

Your main goal is to make sure that kafka is up before the first access to zookeeper/kafka happens.

If that does not happen during the spring context initialization (you can check this in the stacktrace you get on timeout) then the only thing you need to make sure is that embedded kafka init is done synchronously.

For example ou can create new bean that will invoke setupEmbeddedKafkaWithZookeeper and wait until kafka is up.

If zookeeper/kafka are accessed during spring context initialization then it is more tricky. You either need to

  1. wait till kafka is up at the moment that happen
  2. or initialize kafka before spring context is even created.

In order to wait for kafka you can created a wrapper for the KafkaProducerService that will wait till kafka is up in all methods that access kafka/zookeeper`.

Alternatively, you can initialize embedded kafka by creating your own runner inherited (or wrapping) spring runner that will do initialization before spring context creation.

  • The problem is that kafka and zookeeper need to be up before the afterPropertiesSet method of the KafkaProducerService is called since there we try to connect to zookeeper. The idea with increasing the time out was the first thing that came to mind but that's a little bit shaky since it takes different time to start up kafka on different machines/environments. Still there are good ideas in your answer and I will try them out, thank you! – mishless Apr 19 '18 at 07:45