I'm currently using the kafka-streams library.
What I'm trying to do : (Using kafka-streams) I'm trying to consume from topic, manipulate the message-value, acknowledge that message and transmit the result to another topic (code below)
Properties properties = new Properties();
.
.
properties.put("enable.auto.commit", false);
StreamBuilder builder = new StreamBuilder();
KStream kStream = builder.stream("MyTopic");
KafkaStream kafkaStream = new KafkaStream(builder.build(), properties)
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
kStream.forEach(new ForeachAction<String, String>(){
@Override
public void apply(Strings arg, String value){
//Just doing some simple data manipulation
String myValue = value + new Date().toString();
//Sending result to new topic
producer.send(new ProducerRecord<String,String>("MyTopicWithTimeStamp", myValue)):
// Problem (1) Here -> How do I acknowledge this from here manually
// Problem (2) How should I properly handle/close my producer (if at all)
}
});
kafkaStram.start();
What I dont know how to do : Acknowledge the message properly using the kafka-streams library