0

I still having doubts with kafka ZOOKEPER_AUTO_RESET.I have seen lot of questions asked on this regard. Kindly excuse if the same is a duplicate query .

I am having a high level java consumer which keeps on consuming. I do have multiple topics and all topics are having a single partition.

My concern is on the below.

I started the consumerkafka.jar with consumer group name as “ncdev1” and ZOOKEPER_AUTO_RESET = smallest . Could observe that init offset is set as -1. Then I stop/started the jar after sometime. At this time, it picks the latest offset assigned to the consumer group (ncdev1) ie 36. I again restarted after sometime, then the initoffset is set to 39. Which is the latest value.

Then I changed the group name to ZOOKEPER_GROUP_ID = ncdev2. And restarted the jar file, this time again the offset is set to -1. In further restarts, it jumped to the latest value ie 39

Then I set the
ZOOKEPER_AUTO_RESET=largest and ZOOKEPER_GROUP_ID = ncdev3

Then tried restarting the jar file with group name ncdev3. There is no difference in the way it picks offset when it restarts. That is it is picking 39 when it restarts, which is same as the previous configuration.

Any idea on why is it not picking offset form the beginning.Any other configuration to be done to make it read from the beginning?(largest and smallest understanding from What determines Kafka consumer offset?)

Thanks in Advance

Code addedd

public class ConsumerForKafka {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;
    ServerSocket soketToWrite;
    Socket s_Accept ;
    OutputStream s1out ;
    DataOutputStream dos;
    static boolean logEnabled ;
    static File fileName;


    private static final Logger logger = Logger.getLogger(ConsumerForKafka.class);


    public ConsumerForKafka(String a_zookeeper, String a_groupId, String a_topic,String session_timeout,String auto_reset,String a_commitEnable) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId,session_timeout,auto_reset,a_commitEnable));
        this.topic =a_topic;
    }


    public void run(int a_numThreads,String a_zookeeper,  String a_topic) throws InterruptedException, IOException {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        String socketURL = PropertyUtils.getProperty("SOCKET_CONNECT_HOST");  
        int socketPort = Integer.parseInt(PropertyUtils.getProperty("SOCKET_CONNECT_PORT")); 
        Socket socks = new Socket(socketURL,socketPort);        

        //****
        String keeper = a_zookeeper;
        String topic = a_topic;

        long millis = new java.util.Date().getTime();

        //****

        PrintWriter outWriter = new PrintWriter(socks.getOutputStream(), true);

        List<KafkaStream<byte[], byte[]>> streams = null;
        // now create an object to consume the messages
        //
        int threadNumber = 0;
       // System.out.println("going to forTopic value is "+topic);
        boolean keepRunningThread =false;
        boolean chcek = false;
        logger.info("logged");
        BufferedWriter bw = null;
        FileWriter fw = null;
        if(logEnabled){
            fw = new FileWriter(fileName, true);
            bw = new BufferedWriter(fw);
        }

        for (;;) {


            streams = consumerMap.get(topic);
            keepRunningThread =true;

            for (final KafkaStream stream : streams) {

                ConsumerIterator<byte[], byte[]> it = stream.iterator();

                while(keepRunningThread) 
                {

                try{


                   if (it.hasNext()){

                       if(logEnabled){
                           String data = new String(it.next().message())+""+"\n";
                           bw.write(data);
                           bw.flush();
                            outWriter.print(data);
                            outWriter.flush();
                            consumer.commitOffsets();
                            logger.info("Explicit commit ......");
                       }else{

                           outWriter.print(new String(it.next().message())+""+"\n");
                           outWriter.flush();
                       }

                    }
                  // logger.info("running");


                } catch(ConsumerTimeoutException ex) {

                    keepRunningThread =false;
                    break;

                  }catch(NullPointerException npe ){

                      keepRunningThread =true;
                      npe.printStackTrace();
                  }catch(IllegalStateException ile){
                      keepRunningThread =true;
                      ile.printStackTrace();
                  }

                }

            }

        }
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId,String session_timeout,String auto_reset,String commitEnable) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", session_timeout);
        props.put("zookeeper.sync.time.ms", "2000");
        props.put("auto.offset.reset", auto_reset);
        props.put("auto.commit.interval.ms", "60000");
        props.put("consumer.timeout.ms", "30");  
        props.put("auto.commit.enable",commitEnable);
        //props.put("rebalance.max.retries", "4"); 


        return new ConsumerConfig(props);
    }

    public static void main(String[] args) throws InterruptedException {

        String zooKeeper = PropertyUtils.getProperty("ZOOKEEPER_URL_PORT");  
        String groupId =  PropertyUtils.getProperty("ZOOKEPER_GROUP_ID");
        String session_timeout =  PropertyUtils.getProperty("ZOOKEPER_SESSION_TIMOUT_MS"); //6400
        String auto_reset =  PropertyUtils.getProperty("ZOOKEPER_AUTO_RESET");  //smallest
        String enableLogging =  PropertyUtils.getProperty("ENABLE_LOG");
        String directoryPath =  PropertyUtils.getProperty("LOG_DIRECTORY");
        String log4jpath = PropertyUtils.getProperty("LOG_DIR");
        String commitEnable = PropertyUtils.getProperty("ZOOKEPER_COMMIT"); //false
        PropertyConfigurator.configure(log4jpath);

        String socketURL = PropertyUtils.getProperty("SOCKET_CONNECT_HOST");  
        int socketPort = Integer.parseInt(PropertyUtils.getProperty("SOCKET_CONNECT_PORT")); 
        try {
            Socket socks = new Socket(socketURL,socketPort);
            boolean connected = socks.isConnected() && !socks.isClosed();
            if(connected){
                //System.out.println("Able to connect ");
            }else{
                logger.info("Not able to conenct to socket ..Exiting...");
                System.exit(0);
            }
        } catch (UnknownHostException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        } catch(java.net.ConnectException cne){
            logger.info("Not able to conenct to socket ..Exitring...");
            System.exit(0);
        }
        catch (IOException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

      //  String zooKeeper = args[0];
       // String groupId = args[1];
        String topic = args[0];
        int threads = 1;

        logEnabled = Boolean.parseBoolean(enableLogging);
        if(logEnabled)
            createDirectory(topic,directoryPath);

        ConsumerForKafka example = new ConsumerForKafka(zooKeeper, groupId, topic, session_timeout,auto_reset,commitEnable);
        try {
            example.run(threads,zooKeeper,topic);
        } catch(java.net.ConnectException cne){
            cne.printStackTrace();
            System.exit(0);
        }
        catch (IOException e) {
            // TODO Auto-generated catch block

            e.printStackTrace();


        }


    }

    private static void createDirectory(String topic,String d_Path) {

        try{
        File file = new File(d_Path);
        if (!file.exists()) {
            if (file.mkdir()) {
                logger.info("Directory  Created" +file.getPath());
            } else {

                logger.info("Directory  Creation failed");
            }
        }

         fileName = new File(d_Path + topic + ".log");
        if (!fileName.exists()) {
            fileName.createNewFile();
        }



        }catch(IOException IOE){
            //logger.info("IOException occured during Directory or During File creation ");
        }


    }
}
Community
  • 1
  • 1
Jineesh Lk
  • 35
  • 1
  • 6

1 Answers1

0

After rereading your post carefully, I think what you ran into should be as expected.

I started the consumerkafka.jar with consumer group name as “ncdev1” and ZOOKEPER_AUTO_RESET = smallest . Could observe that init offset is set as -1. Then I stop/started the jar after sometime. At this time, it picks the latest offset assigned to the consumer group (ncdev1) ie 36.

auto.offset.reset only applies when there is no initial offset or if an offset is out of range. Since you only have 36 messages in the log, it's possible for the consumer group to read all those records very quickly, that's why you see consumer group always picked the latest offsets every time it got restarted.

amethystic
  • 6,821
  • 23
  • 25
  • ..Thank you .Can you tell me what does commitOffsets does ?From docs it says its commit offset explicitly if the config 'auto.commit.enable' is set t false .But still there are few doubts in my mind on the exact working of the same . For example my consumer consumed 36 msgs . While iterating it one by one and Upon trying to write 21st message my tcp connection got closed or my application got killed .In this case what will be my offset value .Will it be 20 .If it is 20 ,on restart will it start reading from 21st? – Jineesh Lk Apr 27 '17 at 03:53
  • If `auto.commit.enable` is set to false, users take care of the offset-committing things. You could store offsets everywhere, database, NoSQL stores for instance. But if you still want to store offsets in Zookeeper or in Kafka(set `offsets.storage` = kafka), then manually invoking `commitOffsets` helps. – amethystic Apr 27 '17 at 04:02
  • That's why committing offsets is required and Kafka promises "at-least-once" semantics in case of the consumer failures. As for the committed offset, it really means "the next message to be read". – amethystic Apr 27 '17 at 04:11
  • auto.offset.reset only applies when there is no initial offset or if an offset is out of range. – Jineesh Lk Apr 27 '17 at 04:35