2

I followed the following instructions to set up a multi node kafka cluster. Now, how to connect to the zookeeper ? Is it okay to connect to just one zookeeper from the Producer/consumer side in JAVA or is there a way to connect all the zookeeper nodes ?

Setting a multi node Apache ZooKeeper cluster

On every node of the cluster add the following lines to the file kafka/config/zookeeper.properties

    server.1=zNode01:2888:3888
    server.2=zNode02:2888:3888
    server.3=zNode03:2888:3888
    #add here more servers if you want
    initLimit=5
    syncLimit=2

On every node of the cluster create a file called myid in the folder represented by the dataDir property (by default the folder is /tmp/zookeeper ). The myid file should only contains the id of the znode (‘1’ for zNode01, ‘2’ for ZNode02, etc… )

Setting a multi broker Apache Kafka cluster

On every node of the cluster modify modify the property zookeeper.connect from the file kafka/config/server.properties:

    zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181

On every node of the cluster modify the property host.name from the file kafka/config/server.properties: host.name=zNode0x

On every node of the cluster modify the property broker.id from the file kafka/config/server.properties (every broker in the cluster should have a unique id)

amateur
  • 941
  • 4
  • 22
  • 33

2 Answers2

3

You can pass all the nodes in the producer or consumer. Kafka is intelligent enough that it will connect to the node that has the data you required based on the replication factor or the partition

Here is the consumer code :

Properties props = new Properties();
     props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }

You can find more info here

Note: Problem with this approch is it will open multiple connection to find out the which node holds the data. For more robust and scalable systems you can maintain the map of partition number and node name , this will help in load balencing also.

Here is the producer sample

Properties props = new Properties();
 props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

more info here

Shettyh
  • 1,188
  • 14
  • 27
  • What about creating multiple partitions for a topic? how can this be done? Don't we need to delegate that through the ZkClient like this ? Discussed here: http://stackoverflow.com/questions/27036923/how-to-create-a-topic-in-kafka-through-java – amateur Jul 26 '16 at 17:25
  • AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration); – amateur Jul 26 '16 at 17:27
  • You can use AdminUtils to create topics.. But the better practice is to create it in the node itself with command coz it's a onetime task.. Command format /bin/kafka-topics.sh --zookeeper c6401.ambari.apache.org:2181 --create --topic test_topic --partitions 2 --replication-factor 2 Created topic "test_topic". – Shettyh Jul 26 '16 at 18:10
  • I have to create topics dynamically when i send the data over to producer.. – amateur Jul 26 '16 at 18:52
  • @amateur http://stackoverflow.com/questions/36364872/creating-a-topic-for-apache-kafka-0-9-using-java check this – Shettyh Jul 27 '16 at 04:26
1

No need to pass Zookeeper connection properties in the Kafka clients (Producer & Consumer).

From Kafka-v9 and above, Kafka Producer and Consumer doesn't communicate with the Zookeeper.

Kamal Chandraprakash
  • 1,872
  • 18
  • 28
  • I'm using V9, im gettting the following exception, tht zookeeper info is required in the properties .. – amateur Jul 31 '16 at 19:23
  • Caused by: java.lang.IllegalArgumentException: requirement failed: Missing required property 'zookeeper.connect' at scala.Predef$.require(Predef.scala:233) at kafka.utils.VerifiableProperties.getString(VerifiableProperties.scala:177) at kafka.utils.ZKConfig.(ZkUtils.scala:740) – amateur Jul 31 '16 at 19:24
  • Use KafkaProducer and KafkaConsumer from the `kafka-clients` library. – Kamal Chandraprakash Aug 01 '16 at 12:28
  • The only dependency to access a Kafka Broker / Server is `kafka-clients` jar. Remove all the other jars / dependencies from your project. And, then test again. – Kamal Chandraprakash Aug 02 '16 at 04:12