0

I'm currently using the kafka-streams library.

What I'm trying to do : (Using kafka-streams) I'm trying to consume from topic, manipulate the message-value, acknowledge that message and transmit the result to another topic (code below)

Properties properties = new Properties();
.
.
properties.put("enable.auto.commit", false);

StreamBuilder builder = new StreamBuilder();
KStream kStream = builder.stream("MyTopic");
KafkaStream kafkaStream = new KafkaStream(builder.build(), properties)

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

kStream.forEach(new ForeachAction<String, String>(){

 @Override
 public void apply(Strings arg, String value){
  //Just doing some simple data manipulation
  String myValue = value + new Date().toString();

  //Sending result to new topic
  producer.send(new ProducerRecord<String,String>("MyTopicWithTimeStamp", myValue)):
  // Problem (1) Here -> How do I acknowledge this from here manually

  // Problem (2) How should I properly handle/close my producer (if at all)
 }
});

kafkaStram.start();

What I dont know how to do : Acknowledge the message properly using the kafka-streams library

stackoverflow
  • 18,348
  • 50
  • 129
  • 196
  • 1
    There is lot of documentation about Kafka Streams: https://kafka.apache.org/21/documentation/streams/, https://docs.confluent.io/current/streams/index.html, etc – Bartosz Wardziński Jan 30 '19 at 08:28
  • @wardziniak How then do you redirect to different producers for a given condition? Lets say 'record' (in the above code) is null or contains the word "blue"; how would we send these conditions to separate producers? – stackoverflow Jan 30 '19 at 17:16
  • 1
    If you use Kafka Streams, you don't use Kafka Producer directly. How I understand your use case, you want to send message to different topics depends on some predicate, for that you can use `KStream::to(final TopicNameExtractor topicExtractor, final Produced produced)` – Bartosz Wardziński Jan 30 '19 at 17:45
  • 1
    `properties.put("enable.auto.commit", false);` is set to `false` by default anyway -- and you cannot even set it to `true`. Cf: https://stackoverflow.com/questions/43416178/how-to-commit-manually-with-kafka-stream – Matthias J. Sax Feb 01 '19 at 17:15

2 Answers2

3

You could use KStream#to to directly write to Kafka output topic instead of creating and maintaining your own producer instance, as shown below:

final Properties props = new Properties();
...
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> source = builder.stream("MyTopic");
source.mapValues(record -> record + new Date().toString()).to("MyTopicWithTimeStamp", 
    Produced.with(Serdes.String(), Serdes.String()));
...
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
amethystic
  • 6,821
  • 23
  • 25
2

Instead of using manual approach, you can instead write through kafka stream API directly as @amethystic also suggested.

source.mapValues(record -> record + new Date().toString()).to("MyTopicWithTimeStamp", 
    Produced.with(Serdes.String(), Serdes.String()));

Regarding the acknowlegdement, you can set the property in stream configuration to make sure that producer receives acknowledgement while publishing to topics.

final Properties props = new Properties();
props .put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");

You can refer the stream configurations : https://kafka.apache.org/21/documentation/streams/developer-guide/config-streams.html#acks

Nishu Tayal
  • 20,106
  • 8
  • 49
  • 101