2

I am using Apache Kafka v3.0.0. It doesn't require ZooKeeper.

I am trying to create a topic with 3 partitons. I know how to do that via bash console and it works.

/bin/kafka-topics --alter --topic demo-topic1 --bootstrap-server kafka:9092 --partitions 3

I was trying to put it in properties properties.put("partitions", 3); and it doesn't work. Also I was trying to create a ProducerRecord but I get an error.

ProducerRecord<Integer, String> record = new ProducerRecord<>("demo-topic1", 3, i % 3, "" + i); where "demo-topic1" is the name of my topic 3 is the quantity of my partitons i % 3 is a key "" + i is a value My full code of Producer class

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class Producer {
    public static void main(String[] args) throws InterruptedException {
        Logger logger = LoggerFactory.getLogger(Producer.class);
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
        //properties.put("partitions", 3);

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10000; i++) {
            ProducerRecord<Integer, String> record = new ProducerRecord<>("demo-topic1", 3,i % 3, "" + i);
            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    logger.info("received new metadata, topic " + metadata.topic() + " partition " +
                            metadata.partition() + " offsets " + metadata.offset() + " time " + metadata.timestamp());
                } else {
                    logger.error("error producing", exception);
                }
            });
            Thread.sleep(1000);
        }
        producer.flush(); //cleaning producer
        producer.close();
    }

}

Hope sb can help me.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Max Prime
  • 39
  • 7
  • 1
    Would be nice if we could to see the exact error message, perhaps together with the full stacktrace … – tquadrat Dec 22 '21 at 08:27
  • 1
    1) Always use `ProducerConfig` enums for producer clients. 2) To create topic use `AdminClient` with `AdminClientConfig` and `NewTopic` classes, not a producer object or its properties – OneCricketeer Dec 22 '21 at 19:58
  • Thank you @OneCricketeer I find out how to do that [link](https://stackoverflow.com/questions/58505154/increase-number-of-partitions-for-a-topic-in-java) . Now I know that I must use `AdminClient` to start. – Max Prime Dec 23 '21 at 12:19

0 Answers0