0

I am trying to run Junit tests for my application, and trying to start the Kafka services as part of my tests. But, I am getting the NullPointerException after the zookeeper is started and the Kafka server ends abruptly.

Kafka Client Version: 2.4.1

Properties props = new Properties();
    props.setProperty(KafkaConfig.HostNameProp(), "localhost");
    props.setProperty(KafkaConfig.PortProp(), String.valueOf(KAFKA_BROKER_PORT));
    props.setProperty(KafkaConfig.BrokerIdProp(), String.valueOf(KAFKA_BROKER_ID));
    props.setProperty(KafkaConfig.LogDirProp(), Environment.getLogsDir());
    props.setProperty(KafkaConfig.LogDirProp(), System.getProperty("java.io.tmpdir") + "/kafka-logs");
    props.setProperty("enable.zookeeper", "false");
    props.setProperty("zookeeper.connect", "localhost:2181");
    props.setProperty("zookeeper.connection.timeout.ms", "10000");
    props.setProperty("offsets.topic.replication.factor", "1");
    props.setProperty("log.flush.interval.messages", "1");
    props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), "100000");

    // flush every message.
    props.setProperty("log.flush.interval", "1");

    // flush every 1ms
    props.setProperty("log.default.flush.scheduler.interval.ms", "1");
    props.setProperty("log.flush.scheduler.interval.ms", "1");


    kafkaServerStartable = KafkaServerStartable.fromProps(props);


    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            if (kafkaServerStartable != null) {
                kafkaServerStartable.shutdown();
            }
        };

      });
    System.setProperty("zookeeper.sasl.clientconfig", "test");
    System.setProperty("zookeeper.sasl.client", "false");
    kafkaServerStartable.startup();

Any help/leads will be helpful. Thanks!

StackTrace/Logs:

2021-05-12 11:05:29,944 INFO [kafka.server.KafkaServer][main] [] starting
2021-05-12 11:05:29,948 INFO [kafka.server.KafkaServer][main] [] Connecting to zookeeper on localhost:2181
2021-05-12 11:05:30,024 INFO [kafka.zookeeper.ZooKeeperClient][main] [] [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181.
2021-05-12 11:05:30,027 INFO [org.apache.zookeeper.ZooKeeper][main] [] Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@29bc4bf4
2021-05-12 11:05:30,027 INFO [org.apache.zookeeper.ClientCnxnSocket][main] [] jute.maxbuffer value is 4194304 Bytes
2021-05-12 11:05:30,028 INFO [org.apache.zookeeper.ClientCnxn][main] [] zookeeper.request.timeout value is 0. feature enabled=
2021-05-12 11:05:30,034 INFO [kafka.zookeeper.ZooKeeperClient][main] [] [ZooKeeperClient Kafka server] Waiting until connected.
2021-05-12 11:05:30,035 INFO [org.apache.zookeeper.ClientCnxn][main-SendThread(localhost:2181)] [] Opening socket connection to server localhost/127.0.0.1:2181
2021-05-12 11:05:30,036 INFO [org.apache.zookeeper.ClientCnxn][main-SendThread(localhost:2181)] [] Socket connection established, initiating session, client: /127.0.0.1:59560, server: localhost/127.0.0.1:2181
2021-05-12 11:05:30,037 ERROR [org.apache.zookeeper.server.ZooKeeperSaslServer][NIOServerCxnFactory.SelectorThread-0] [] server principal name/hostname determination error: 
java.lang.StringIndexOutOfBoundsException: String index out of range: -1
    at java.lang.String.substring(String.java:1967)
    at org.apache.zookeeper.util.SecurityUtils.createSaslServer(SecurityUtils.java:174)
    at org.apache.zookeeper.server.ZooKeeperSaslServer.createSaslServer(ZooKeeperSaslServer.java:44)
    at org.apache.zookeeper.server.ZooKeeperSaslServer.<init>(ZooKeeperSaslServer.java:38)
    at org.apache.zookeeper.server.NIOServerCnxn.<init>(NIOServerCnxn.java:104)
    at org.apache.zookeeper.server.NIOServerCnxnFactory.createConnection(NIOServerCnxnFactory.java:848)
    at org.apache.zookeeper.server.NIOServerCnxnFactory$SelectorThread.processAcceptedConnections(NIOServerCnxnFactory.java:479)
    at org.apache.zookeeper.server.NIOServerCnxnFactory$SelectorThread.run(NIOServerCnxnFactory.java:392)
2021-05-12 11:05:30,056 INFO [org.apache.zookeeper.ClientCnxn][main-SendThread(localhost:2181)] [] Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x10143dd4de40008, negotiated timeout = 6000
2021-05-12 11:05:30,065 INFO [kafka.zookeeper.ZooKeeperClient][main] [] [ZooKeeperClient Kafka server] Connected.
2021-05-12 11:05:30,478 ERROR [kafka.server.KafkaServer][main] [] Fatal error during KafkaServer startup. Prepare to shutdown
java.lang.NullPointerException
    at com.fasterxml.jackson.databind.type.ResolvedRecursiveType.equals(ResolvedRecursiveType.java:103)
    at com.fasterxml.jackson.databind.type.TypeBindings$AsKey.equals(TypeBindings.java:458)
    at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:947)
    at com.fasterxml.jackson.databind.util.LRUMap.get(LRUMap.java:68)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1211)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
    at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
    at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1380)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
    at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
    at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
    at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
    at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
    at com.fasterxml.jackson.databind.type.TypeFactory._resolveSuperInterfaces(TypeFactory.java:1298)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromClass(TypeFactory.java:1243)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromParamType(TypeFactory.java:1384)
    at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1154)
    at com.fasterxml.jackson.databind.type.TypeFactory.constructType(TypeFactory.java:622)
    at com.fasterxml.jackson.databind.introspect.AnnotatedClass.resolveType(AnnotatedClass.java:228)
    at com.fasterxml.jackson.databind.introspect.AnnotatedConstructor.getParameterType(AnnotatedConstructor.java:104)
    at com.fasterxml.jackson.databind.introspect.AnnotatedWithParams.getParameter(AnnotatedWithParams.java:105)
    at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._addCreators(POJOPropertiesCollector.java:449)
    at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:303)
    at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueMethod(POJOPropertiesCollector.java:172)
    at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueMethod(BasicBeanDescription.java:224)
    at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:350)
    at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.buildMapSerializer(BasicSerializerFactory.java:763)
    at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.buildContainerSerializer(BasicSerializerFactory.java:572)
    at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:190)
    at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:159)
    at com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1272)
    at com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1222)
    at com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:499)
    at com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:697)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:270)
    at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3672)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3072)
    at kafka.utils.Json$.encodeAsBytes(Json.scala:115)
    at kafka.zk.ClusterIdZNode$.toJson(ZkData.scala:709)
    at kafka.zk.KafkaZkClient.createOrGetClusterId(KafkaZkClient.scala:1524)
    at kafka.server.KafkaServer$$anonfun$getOrGenerateClusterId$1.apply(KafkaServer.scala:404)
    at kafka.server.KafkaServer$$anonfun$getOrGenerateClusterId$1.apply(KafkaServer.scala:404)
    at scala.Option.getOrElse(Option.scala:121)
    at kafka.server.KafkaServer.getOrGenerateClusterId(KafkaServer.scala:404)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:210)
    at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)

Tried debugging the class, found that the code throws the exception when it is trying to get the cluster id of the already connected (as per the logs) Zookeeper.

Akshit Gupta
  • 17
  • 10
  • Please show your code and your broker settings – OneCricketeer May 13 '21 at 12:52
  • @OneCricketeer Edited the question with the code used to start the Kafka. – Akshit Gupta May 17 '21 at 06:35
  • Have you tried using the spring-kafka-test embedded broker? Or other kafka-junit projects found on Github? Or testcontainers? – OneCricketeer May 17 '21 at 11:43
  • I feel like you've not setup SASL correctly, but you've told Zookeeper some properties about it (the error is related to SASL configuration). https://stackoverflow.com/questions/43469962/kafka-sasl-zookeeper-authentication And you've also got that `zookeeper.enabled=false` on the broker, which does nothing for Kafka 2.4 – OneCricketeer May 17 '21 at 11:51

0 Answers0