I am using Apache Kafka v3.0.0. It doesn't require ZooKeeper.
I am trying to create a topic with 3 partitons. I know how to do that via bash console and it works.
/bin/kafka-topics --alter --topic demo-topic1 --bootstrap-server kafka:9092 --partitions 3
I was trying to put it in properties properties.put("partitions", 3);
and it doesn't work.
Also I was trying to create a ProducerRecord but I get an error.
ProducerRecord<Integer, String> record = new ProducerRecord<>("demo-topic1", 3, i % 3, "" + i);
where "demo-topic1" is the name of my topic
3 is the quantity of my partitons
i % 3 is a key
"" + i is a value
My full code of Producer class
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class Producer {
public static void main(String[] args) throws InterruptedException {
Logger logger = LoggerFactory.getLogger(Producer.class);
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
//properties.put("partitions", 3);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10000; i++) {
ProducerRecord<Integer, String> record = new ProducerRecord<>("demo-topic1", 3,i % 3, "" + i);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
logger.info("received new metadata, topic " + metadata.topic() + " partition " +
metadata.partition() + " offsets " + metadata.offset() + " time " + metadata.timestamp());
} else {
logger.error("error producing", exception);
}
});
Thread.sleep(1000);
}
producer.flush(); //cleaning producer
producer.close();
}
}
Hope sb can help me.