14

I'm new to Kafka and am trying to use the AdminClient API to manage the Kafka server running on my local machine. I have it setup exactly the same as in the quick start section of the Kafka documentation. The only difference being that I have not created any topics.

I have no issues running any of the shell scripts on this setup but when I try to run the following java code:

public class ProducerMain{

    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "localhost:9092");


        try(final AdminClient adminClient = 
              KafkaAdminClient.create(props)){

            try {
                final NewTopic newTopic = new NewTopic("test", 1, 
                    (short)1);

                final CreateTopicsResult createTopicsResult = 
                    adminClient.createTopics( 
                         Collections.singleton(newTopic));

                createTopicsResult.all().get();

            }catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

Error: TimeoutException: Timed out waiting for a node assignment

Exception in thread "main" java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at ProducerMain.main(ProducerMain.java:41)
    <br>Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)
at ProducerMain.main(ProducerMain.java:38)
<br>Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

I have searched online for an indication as to what the problem could be but have found nothing so far. Any suggestions are welcome as I am at the end of my rope.

Ryuzaki L
  • 37,302
  • 12
  • 68
  • 98
Spazoide
  • 163
  • 1
  • 6
  • Did you find a solution to your issue? I'm getting the same error when I run the KafkaReadyCommand against a deployed clustered broker, and cannot figure out what happens. – Gaetan Oct 08 '18 at 09:30
  • @Gaetan A system restart ended up fixing the problem for me since I was running Kafka on my local machine. I still don't know what the root of the problem was however. Sorry I couldn't be more helpful. – Spazoide Oct 09 '18 at 15:38

2 Answers2

2

Sounds like your broker isn't healthy...

This code works fine

public class Main {

    static final Logger logger = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, "local-test");
        properties.setProperty(AdminClientConfig.RETRIES_CONFIG, "3");

        try (AdminClient client = AdminClient.create(properties)) {
            final CreateTopicsResult res = client.createTopics(
                    Collections.singletonList(
                            new NewTopic("foo", 1, (short) 1)
                    )
            );
            res.all().get(5, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            logger.error("unable to create topic", e);
        }
    }
}

And I can see in the broker logs that the topic was created

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
0

I started kafka service with bitnami/kafka, and got exactly the same error. Try to start kafka by this version, it works: https://hub.docker.com/r/wurstmeister/kafka

$ docker run -d --name zookeeper-server --network app-tier \
  -e ALLOW_ANONYMOUS_LOGIN=yes  -p 2181:2181 zookeeper:3.6.2

$ docker run -d --name kafka-server --network app-tier --publish 9092:9092 \
  --env KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181 \
  --env KAFKA_ADVERTISED_HOST_NAME=30.225.51.235 \
  --env KAFKA_ADVERTISED_PORT=9092  \
  wurstmeister/kafka

30.225.51.235 is ip address for the host machine.

david euler
  • 714
  • 8
  • 12