I am using Spring for dependency injection and I have a bean that happens to be a Kafka producer service and it gets it's configuration like zookeeper server, etc. through properties file.
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Service
public class KafkaProducerService implements InitializingBean {
@Autowired
private Properties properties;
private KafkaProducer<String, String> producer;
private ZkUtils zkUtils;
public KafkaProducerService() {
}
@Override
public void afterPropertiesSet() throws Exception {
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("kafka.bootstrap.servers"));
kafkaProducerProperties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, properties.getProperty("kafka.producer.timeout", "3000"));
kafkaProducerProperties.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, properties.getProperty("kafka.reconnect.backoff.ms", "1000"));
kafkaProducerProperties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, properties.getProperty("kafka.producer.timeout", "3000"));
kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
String zookeeperEndpoint = properties.get("zookeeper.connect") + ":2181";
this.producer = new KafkaProducer<>(kafkaProducerProperties);
final ZkClient zkClient = new ZkClient(zookeeperEndpoint, 10000, 10000, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperEndpoint), false);
}
public boolean publishMessage(final String message, final String topic) {
try {
producer.send(new ProducerRecord<>(topic, message))
.get(3, TimeUnit.SECONDS);
return true;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return false;
}
}
public void tearDown() {
this.producer.close();
}
}
I am using this service by autowiring it into other services and that works well when running the application and working with it. I have additional spring context for tests that loads an embedded Kafka.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
<context:annotation-config/>
<context:component-scan base-package="my.project.main"/>
<context:property-placeholder location="classpath*:my_properties.properties"/>
<bean id="embeddedKafka" class="my.project.main.EmbeddedKafka"
init-method="setupEmbeddedKafkaWithZookeeper"
destroy-method="tearDown"/>
<bean id="properties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="zookeeper.connect">localhost</prop>
<prop key="kafka.bootstrap.servers">localhost:9092</prop>
<prop key="acks">1</prop>
<prop key="kafka.producer.timeout">5000</prop>
<prop key="kafka.reconnect.backoff.ms">30000</prop>
</props>
</property>
</bean>
</beans>
Whenever I run the tests the kafka sevrice producer should be using the in memory kafka that is loaded through the EmbeddedKafka bean. The problem is that in most cases the embedded kafka takes too much time to start so the kafka producer times out and fails to be instantiated by Spring. Is there any mechanism to make the KafkaProducerService bean to "wait" until the EmbeddedKafka bean is instantiated?