1

I need to read all messages form Kafka topic then process and exit (no need to run like a daemon forever) . I have written a code like below , it serves the purpose if messages available in topic , if the topic is empty ( or no new message for mentioned Group_id) it will wait till next message arrives , I need to exit immediately if no message available to process. Please have look on my code and suggest if any better way to achieve this . I am using ruby-kafka 1.3.0 gem

require 'kafka'
khost = 'xxx.xxx.xxx.xxx'
kport = 'xxxx'

kafka = Kafka.new(["#{khost}:#{kport}"] )
consumer = kafka.consumer(group_id: "my-consumer")
consumer.subscribe("my-topic")

consumer.each_batch do |batch|
    $msg = batch
    consumer.stop  # stop after reading first batch
end 

# Process messages here

$msg.messages.each do |message|
  puts message.value
end 

I have also found a method kafka.fetch_messages , However I did not find an option to maintain group_id and track already processed messages without adding additional code .

Shan Sunny
  • 92
  • 10

0 Answers0