When Kafka Consumer is launched from terminal, it is possible to set --from-beginning
, so that the consumer read messages from the beginning of a Kafka queue.
~/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic TutorialTopic --from-beginning
However, how can I set this parameter in Scala (or Java)? This is my sample ConsumerConfig
:
import kafka.consumer.ConsumerConfig
def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
val props = new Properties()
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("auto.offset.reset", "largest");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
val config = new ConsumerConfig(props)
config
}