I am trying to run Junit tests for my application, and trying to start the Kafka services as part of my tests.
But, I am getting the NullPointerException
after the zookeeper is started and the Kafka server ends abruptly.
Kafka Client Version: 2.4.1
Properties props = new Properties();
props.setProperty(KafkaConfig.HostNameProp(), "localhost");
props.setProperty(KafkaConfig.PortProp(), String.valueOf(KAFKA_BROKER_PORT));
props.setProperty(KafkaConfig.BrokerIdProp(), String.valueOf(KAFKA_BROKER_ID));
props.setProperty(KafkaConfig.LogDirProp(), Environment.getLogsDir());
props.setProperty(KafkaConfig.LogDirProp(), System.getProperty("java.io.tmpdir") + "/kafka-logs");
props.setProperty("enable.zookeeper", "false");
props.setProperty("zookeeper.connect", "localhost:2181");
props.setProperty("zookeeper.connection.timeout.ms", "10000");
props.setProperty("offsets.topic.replication.factor", "1");
props.setProperty("log.flush.interval.messages", "1");
props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), "100000");
// flush every message.
props.setProperty("log.flush.interval", "1");
// flush every 1ms
props.setProperty("log.default.flush.scheduler.interval.ms", "1");
props.setProperty("log.flush.scheduler.interval.ms", "1");
kafkaServerStartable = KafkaServerStartable.fromProps(props);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
if (kafkaServerStartable != null) {
kafkaServerStartable.shutdown();
}
};
});
System.setProperty("zookeeper.sasl.clientconfig", "test");
System.setProperty("zookeeper.sasl.client", "false");
kafkaServerStartable.startup();
Any help/leads will be helpful. Thanks!
StackTrace/Logs:
2021-05-12 11:05:29,944 INFO [kafka.server.KafkaServer][main] [] starting
2021-05-12 11:05:29,948 INFO [kafka.server.KafkaServer][main] [] Connecting to zookeeper on localhost:2181
2021-05-12 11:05:30,024 INFO [kafka.zookeeper.ZooKeeperClient][main] [] [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181.
2021-05-12 11:05:30,027 INFO [org.apache.zookeeper.ZooKeeper][main] [] Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@29bc4bf4
2021-05-12 11:05:30,027 INFO [org.apache.zookeeper.ClientCnxnSocket][main] [] jute.maxbuffer value is 4194304 Bytes
2021-05-12 11:05:30,028 INFO [org.apache.zookeeper.ClientCnxn][main] [] zookeeper.request.timeout value is 0. feature enabled=
2021-05-12 11:05:30,034 INFO [kafka.zookeeper.ZooKeeperClient][main] [] [ZooKeeperClient Kafka server] Waiting until connected.
2021-05-12 11:05:30,035 INFO [org.apache.zookeeper.ClientCnxn][main-SendThread(localhost:2181)] [] Opening socket connection to server localhost/127.0.0.1:2181
2021-05-12 11:05:30,036 INFO [org.apache.zookeeper.ClientCnxn][main-SendThread(localhost:2181)] [] Socket connection established, initiating session, client: /127.0.0.1:59560, server: localhost/127.0.0.1:2181
2021-05-12 11:05:30,037 ERROR [org.apache.zookeeper.server.ZooKeeperSaslServer][NIOServerCxnFactory.SelectorThread-0] [] server principal name/hostname determination error:
java.lang.StringIndexOutOfBoundsException: String index out of range: -1
at java.lang.String.substring(String.java:1967)
at org.apache.zookeeper.util.SecurityUtils.createSaslServer(SecurityUtils.java:174)
at org.apache.zookeeper.server.ZooKeeperSaslServer.createSaslServer(ZooKeeperSaslServer.java:44)
at org.apache.zookeeper.server.ZooKeeperSaslServer.<init>(ZooKeeperSaslServer.java:38)
at org.apache.zookeeper.server.NIOServerCnxn.<init>(NIOServerCnxn.java:104)
at org.apache.zookeeper.server.NIOServerCnxnFactory.createConnection(NIOServerCnxnFactory.java:848)
at org.apache.zookeeper.server.NIOServerCnxnFactory$SelectorThread.processAcceptedConnections(NIOServerCnxnFactory.java:479)
at org.apache.zookeeper.server.NIOServerCnxnFactory$SelectorThread.run(NIOServerCnxnFactory.java:392)
2021-05-12 11:05:30,056 INFO [org.apache.zookeeper.ClientCnxn][main-SendThread(localhost:2181)] [] Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x10143dd4de40008, negotiated timeout = 6000
2021-05-12 11:05:30,065 INFO [kafka.zookeeper.ZooKeeperClient][main] [] [ZooKeeperClient Kafka server] Connected.
2021-05-12 11:05:30,478 ERROR [kafka.server.KafkaServer][main] [] Fatal error during KafkaServer startup. Prepare to shutdown
java.lang.NullPointerException
at com.fasterxml.jackson.databind.type.ResolvedRecursiveType.equals(ResolvedRecursiveType.java:103)
at com.fasterxml.jackson.databind.type.TypeBindings$AsKey.equals(TypeBindings.java:458)
at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:947)
at com.fasterxml.jackson.databind.util.LRUMap.get(LRUMap.java:68)
at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1211)
at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1380)
at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
at com.fasterxml.jackson.databind.type.TypeFactory.constructType(TypeFactory.java:622)
at com.fasterxml.jackson.databind.introspect.AnnotatedClass.resolveType(AnnotatedClass.java:228)
at com.fasterxml.jackson.databind.introspect.AnnotatedConstructor.getParameterType(AnnotatedConstructor.java:104)
at com.fasterxml.jackson.databind.introspect.AnnotatedWithParams.getParameter(AnnotatedWithParams.java:105)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._addCreators(POJOPropertiesCollector.java:449)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:303)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueMethod(POJOPropertiesCollector.java:172)
at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueMethod(BasicBeanDescription.java:224)
at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:350)
at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.buildMapSerializer(BasicSerializerFactory.java:763)
at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.buildContainerSerializer(BasicSerializerFactory.java:572)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:190)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:159)
at com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1272)
at com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1222)
at com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:499)
at com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:697)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:270)
at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3672)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3072)
at kafka.utils.Json$.encodeAsBytes(Json.scala:115)
at kafka.zk.ClusterIdZNode$.toJson(ZkData.scala:709)
at kafka.zk.KafkaZkClient.createOrGetClusterId(KafkaZkClient.scala:1524)
at kafka.server.KafkaServer$$anonfun$getOrGenerateClusterId$1.apply(KafkaServer.scala:404)
at kafka.server.KafkaServer$$anonfun$getOrGenerateClusterId$1.apply(KafkaServer.scala:404)
at scala.Option.getOrElse(Option.scala:121)
at kafka.server.KafkaServer.getOrGenerateClusterId(KafkaServer.scala:404)
at kafka.server.KafkaServer.startup(KafkaServer.scala:210)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
Tried debugging the class, found that the code throws the exception when it is trying to get the cluster id of the already connected (as per the logs) Zookeeper.