0

I have below Kafka consumer which I want to pause in certain condition and then resume it later to consume all the previous message. One idea is to use shared flag which can be updated by other thread and before consuming, i.e., iterator.next().message() I check the value of the flag. If it's true don't consume else consume the message. Just wanted to check if I am thinking in right direction or if there is a better way of doing it.

class KafkaConsumer implements Runnable {
        KafkaStream<byte[], byte[]> topicStream;
        ConsumerConnector consumerConnectorObj;

        public KafkaConsumer(final KafkaStream<byte[], byte[]> topicStream,
                final ConsumerConnector consumerConnectorObj) {
            this.topicStream = topicStream;
            this.consumerConnectorObj = consumerConnectorObj;
        }

        @Override
        public void run() {
            if (topicStream != null) {
                ConsumerIterator<byte[], byte[]> iterator = topicStream.iterator();
                while (true) {
                        if (iterator != null) {
                            boolean nextFlag = true;
                            try {
                                iterator.hasNext();
                            } catch (ConsumerTimeoutException e) {
                                LOG.warn("Consumer timeout occured", e);
                                nextFlag = false;
                            }

                            if (nextFlag) {
                                byte[] msg = iterator.next().message();
                            }
                        }
                }
            }
        }
    }
Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
Rishi Saraf
  • 1,644
  • 2
  • 14
  • 27

1 Answers1

1

Normally you should not need to synchronize between threads by hand-crafted means, because you have a good chance to get it wrong. Have look at java.util.concurrent for helpers. In your case it could be a semaphore. The easiest way to use it would be to acquire the semaphore before processing a message, handing it back afterwards and trying to acquire it again right away in the next loop.

My hunch is, that this is not the best way to do it though. I'd rather call availablePermits() and keep consuming while the number is greater one. Only when it drops to zero, try to acquire the semaphore. This will block the thread until the other thread has provided one permit again. This will unblock your worker thread, and hand it out the permit, which it should give back right away again, starting to loop as above.

while (true) {
  if (semaphore.availablePermits()<=0) {
    semaphore.acquire(); // will block
    // eventually another thread increments the semaphore again
    // and we arrive here
    semaphore.release();
  }
  // keep processing messages
}

You may find additional ideas in answers to this question.

Community
  • 1
  • 1
Harald
  • 4,575
  • 5
  • 33
  • 72