32

I'm getting the error:

 org.apache.kafka.common.errors.TimeoutException: Topic testtopic2 not present in metadata after 60000 ms.

When trying to produce to the topic in my local kafka instance on windows using Java. Note that the topic testtopic2 exists and I'm able produce messages to it using the windows console producer just fine.

Below the code that I'm using:

import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Kafka_Producer {

    public static void main(String[] args){

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        TestCallback callback = new TestCallback();
        for (long i = 0; i < 100 ; i++) {
            ProducerRecord<String, String> data = new ProducerRecord<String, String>(
                    "testtopic2", "key-" + i, "message-"+i );
            producer.send(data, callback);
        }

        producer.close();
    }


    private static class TestCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
                System.out.println("Error while producing message to topic :" + recordMetadata);
                e.printStackTrace();
            } else {
                String message = String.format("sent message to topic:%s partition:%s  offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
                System.out.println(message);
            }
        }
    }

}

Pom dependency:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

Output of list and describe: output of list topics

output of describe testtopic2

Gherbi Hicham
  • 2,416
  • 4
  • 26
  • 41
  • 1
    Can you show the output of the topic creation and list topics command? – OneCricketeer Sep 03 '20 at 18:16
  • @OneCricketeer sure, I have added the output of list and describe topic to the question, I set the replication factor to 1 as I'm working with one broker instance and the number of partitions to 3. – Gherbi Hicham Sep 03 '20 at 19:39

17 Answers17

20

I was having this same problem today. I'm a newbie at Kafka and was simply trying to get a sample Java producer and consumer running. I was able to get the consumer working, but kept getting the same "topic not present in metadata" error as you, with the producer.

Finally, out of desperation, I added some code to my producer to dump the topics. When I did this, I then got runtime errors because of missing classes in packages jackson-databind and jackson-core. After adding them, I no longer got the "topic not present" error. I removed the topic-dumping code I temporarily added, an it still worked.

Bob Dobbs
  • 216
  • 2
  • 3
  • 1
    I spent a whole day struggling with this..Thank you very much – aravind Nov 17 '20 at 06:08
  • 16
    @Bob Doobs can you please share the code changes done for this solution ? – Parveen Verma Oct 29 '21 at 10:36
  • Kafka 2.8.0, I think, was missing Jackson classes. It's recommended to downgrade – OneCricketeer Jan 24 '22 at 14:05
  • I was having this same issue in local environment running the landoop image in docker. After spending a lot of time researching the root cause, in my case I was trying to post a message to a non-existent partition. When I omitted the partition in the settings it worked fine – Thiago Matar Mar 29 '22 at 03:03
  • We built a LogAppender for logback in a spring setup and it kept getting stuck at the first log message in the boot process. Adding a seemingly useless `List result = producer.partitionsFor(topic);` after the creation of the KafkaProducer solved it. Probably a race condition in the constructor of the KafkaProducer - a standalone test with the same properties worked just fine. – dube Apr 13 '22 at 06:06
15

This error also can appear because of destination Kafka instance "died" or URL to it is wrong.

In such a case a thread that sends message to Kafka will be blocked on max.block.ms time which defaults exactly to 60000 ms.

You can check whether it is because of above property by passing changed value:

Properties props = new Properties();
...(among others)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30000); # 30 sec or any other value of your choice 

If TimeoutException is thrown after your specified time, then you should check whether your URL to Kafka is correct or Kafka instance is alive.

Dmitriy Fialkovskiy
  • 3,065
  • 8
  • 32
  • 47
11

It might also be caused by an nonexistent partition.

e.g. If you have a single partition [0] and your producer tries to send to partition [1] you'll get the same error. The topic in this case exists, but not the partition.

Federico Nafria
  • 1,397
  • 14
  • 39
5

First off I want to say thanks to Bobb Dobbs for his answer, I was also struggling with this for a while today. I just want to add that the only dependency I had to add is jackson-databind. This is the only dependency I have in my project, besides kafka-clients.

Update: I've learned a bit more about what's going on. kafka-clients sets the scope of its jackson-databind dependency as "provided," which means it expects it to be provided at runtime by the JDK or a container. See this article for more details on the provided maven scope.

This scope is used to mark dependencies that should be provided at runtime by JDK or a container, hence the name. A good use case for this scope would be a web application deployed in some container, where the container already provides some libraries itself.

I'm not sure the exact reasoning on setting its scope to provided, except that maybe this library is something people normally would want to provide themselves to keep it up to the latest version for security fixes, etc.

Brandon
  • 91
  • 1
  • 6
  • 1
    Try adding `jackson-databind` to your project dependencies. If that doesn't work, try also adding `jackson-core`. If that doesn't work, I'm not sure what else to try. – Brandon Feb 12 '21 at 18:56
  • This exceptions appears on all the occurrence or it appears sometimes? – user3082820 Dec 08 '21 at 14:05
  • @user3082820 If I remember correctly, the exception happened every time I tried to produce a message to a kafka topic. – Brandon Dec 12 '21 at 00:56
  • @Bradon, `jackson-core` is transitive dependency of `jackson-databind`, so, how can explicit addition of `jackson-core` help here, explain please. – Mikhail2048 Mar 23 '22 at 08:08
  • I don't know if it would help, then. I suppose some other dependency could require a later version of jackson-core (using the "provided" scope) than the version of jackson-databind being used provides, but I don't know if that's what's happening for you, if you're having this issue. – Brandon Mar 24 '22 at 16:18
5

I saw this issue when someone on my team had changed the value for the spring.kafka.security.protocol config (we are using Spring on my project). Previously it had been "SSL" in our config, but it was updated to be PLAINTEXT. In higher environments where we connect to a cluster that uses SSL, we saw the error OP ran into.

Why we saw this error as opposed to an SSL error or authentication error is beyond me, but if you run into this error it may be worth double checking your client authentication configs to your Kafka cluster.

phlogiston
  • 331
  • 5
  • 10
  • I had same issue..able to solve by setting "security.protocol": "ssl" for producer. – ranjeet May 24 '22 at 19:48
  • Thanks, I spent hours looking at my cluster configs. In the end, I had a typo on "security.protocol" key on my producer config. This error message didnt help at all... – kamiha Jun 15 '22 at 01:01
4

I also had similar issue, where I was trying this on my local environment on my macbook. It was quite frustrating and I tried a few approaches

  1. Stopped Zookeeper, Stopped Kafka, restarted ZK and Kafka. (Didn't help)
  2. Stopped ZK. Deleted ZK data directory. Deleted Kafka logs.dirs and restarted Kafka (Didn't help)
  3. Restarted my macbook - This did the trick.

I have used Kafka in production for more than 3 years, but didn't face this problem on the cluster, happened only on my local environment. However, restarting fixes it for me.

java_enthu
  • 2,279
  • 7
  • 44
  • 74
2

This error is an apparent error, and it may be triggered by the following deep conditions.

  1. First and the most situation is your kafka producer config is wrong, check your kafka properties BOOTSTRAP_SERVERS_CONFIG weather is correct server address.
  2. In docker environment, you might check your port mapping.
  3. Check whether the firewall has opened port 9092 of the server where the broker is located.
  4. If your broker run in ssl, check your producer config about SSL_TRUSTSTROE_LOCATION_CONFIG, SECURITY_PROTOCOL_CONFIG, SSL_TRUSTSTORE_TYPE_CONFIG. And, some broker config both run in ssl and PLAINTEXT, make sure which port is your need.
1
  1. I created a topic with single partition and tried to populate the topic into 10 partitions. And I got this issue.

  2. I deleted the topic using kafka-topics.sh script, but didn't wait long to finish the clean up. I started populating the topic. When I was looking at topic metadata, it has one partition and I am getting exactly same issue as mentioned in first part of this answer.

Steephen
  • 14,645
  • 7
  • 40
  • 47
0

You may want to check your producer properties for metadata.max.idle.ms

The metadata a producer caches for as long as above configured value. Any changes to the meta on the broker end will not be available on the client (producer) immediately. Restarting a producer should however, read the metadata at startup.

Update: check default values here.. https://kafka.apache.org/documentation.html#metadata.max.idle.ms

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
kazzaki
  • 64
  • 3
0

Note that this could happen as well because the versions of kafka-client and Spring are not compatible

More info in https://spring.io/projects/spring-kafka "Kafka Client Compatibility" matrix

void
  • 1,484
  • 1
  • 15
  • 18
0

kafka-topic --bootstrap-server 127.0.0.1:9092 --topic my_first --create --partitions 3

First try to insert the topic with in the Kafka stream using the above command

here my_first is the topic name.

0
  1. The two dependencies in pom : kafka-streams and spring-kafka
  2. in application.yml (or properties) :
    spring:
      kafka:
        bootstrap-servers: <service_url/bootstrap_server_ur>
        producer:
          bootstrap-servers: <service_url/bootstrap_server_url>
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          group-id: <your_consumer_id>
  1. @SpringBootApplication class another annotation : @EnableKafka

This will make it work without any errors.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
0

I was facing same issue. It could happen when your bootstrap or registry URL are wrong or unreachable

lmo
  • 497
  • 5
  • 23
0

in case if you came here with same error while setting up your integration tests using testcontainers, this could happen because of used port by kafka inside container and exposed port outside. So, make sure that started bootstrap server port is correctly mapped to exposed port that you are using in your tests.

In my case i just replaced properties file entries after kafka container started:

KafkaContainer kafka = new KafkaContainer(...);
kafka.start();

String brokers = kafka.getBootstrapServers()
TestPropertySourceUtils.addInlinedPropertiesToEnvironment(context,
    "spring.kafka.bootstrap-servers=" + brokers,
    "spring.kafka.producer.bootstrap-servers=" + brokers,
    "spring.kafka.consumer.bootstrap-servers=" + brokers
);

Spent quite some time before I figured it out, hope this helps someone.

Rail Yulgutlin
  • 316
  • 3
  • 11
  • Alternatively, you can use the property `spring.kafka.properties.bootstrap.servers` which does all three in one shot. – Simon Jan 25 '23 at 12:59
0

I was having the same problem, and it's because of wrong config. Here's my Producer configuration that worked. Change ${} properties with your config. Don't forget to set all properties:

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ${servers});
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    props.put("enable.auto.commit", "false");
    props.put("auto.offset.reset", "earliest");
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("basic.auth.credentials.source", "USER_INFO");
    props.put("basic.auth.user.info", "${user}:${pass}");
    props.put("sasl.kerberos.service.name", "kafka");
    props.put("auto.register.schemas", "false");
    props.put("schema.registry.url", "${https://your_url}");
    props.put("schema.registry.ssl.truststore.location", "client_truststore.jks");
    props.put("schema.registry.ssl.truststore.password", "${password}");

    KafkaProducer producer = new KafkaProducer(props);

    ClassEvent event = getEventObjectData();

    ProducerRecord<String, ClassEvent> record = new ProducerRecord<String, ClassEvent>(args[0], event);

Execution from cluster:

java -Djava.security.auth.login.config=${jaas.conf} -cp ${your-producer-example.jar} ${your.package.class.ClassName} ${topic}

Hope it helps

helvete
  • 2,455
  • 13
  • 33
  • 37
0

In the case of reading stream and writing stream from a topic to another, a possible solution could be:

<dataset being read>.drop("partition")

Explanation: the row in the read dataframe comes with the source's partition column, if the source topic has more partitions than the destination topic has, then it's gonna try to write the row to the specified partition in the destination. If that partition doesn't exist on the destination topic then you will get the error.

I was also able to obtain a more comprehensive version of the error when deployed in cluster mode: Partition 22 of topic with partition count 3 is not present in metadata after 60000 ms.

The solution would be to either drop the partition column and let kafka choose the partition itself, or replace the original partition number with a desired one (using modulo #destination partitions).

0

I faced the same issue, it's simply the producer wasn't able to connect to the bootstrap server, and my problem was related to the JKS trust-store for the SSL configuration, once I configured it correctly, everything started to work as usual.

Hany Sakr
  • 2,591
  • 28
  • 27