2

I use Kafka streams to process the real-time data and I need to do some aggregate operations for data of a windowed time.

I have two questions about the aggregate operation.

  1. How to get the aggregated data? I need to send it to a 3rd service.
  2. After the aggregate operation, I can't send message to a 3rd service, the code doesn't run.

Here is my code:

stream = builder.stream("topic");
windowedKStream = stream.map(XXXXX).groupByKey().windowedBy("5mins");
ktable = windowedKStream.aggregate(()->"", new Aggregator(K,V,result));

// my data is stored in 'result' variable, but I can't get it at the end of the 5 mins window. 
// I need to send the 'result' to a 3rd service. But I don't know where to temporarily store it and then how to get it.

// below is the code the call a 3rd service, but the code can't be executed(reachable).
// I think it should be executed every 5 mins when thewindows is over. But it isn't.

result = httpclient.execute('result');
NingLee
  • 1,477
  • 2
  • 17
  • 26
  • What do you mean by "my data is stored in 'result' variable"? Also note, that Kafka Streams windowed aggregations emit a result record for each *update* to the window -- there is no notion of a final result. Cf: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ -- there are also a couple of SO question about "final result" that you might want to check out. – Matthias J. Sax Aug 22 '18 at 05:21
  • @MatthiasJ.Sax I'm sorry, 'result' is the third parameter of the Aggregator() function. – NingLee Aug 23 '18 at 01:00
  • This is not how Kafka Streams work... Hope my answer helps. – Matthias J. Sax Aug 23 '18 at 05:18

1 Answers1

3

I guess might want to do something like:

ktable.toStream().foreach((k,v) -> httpclient.execute(v));

Each time the KTable is updated (with caching disabled), the update record will be sent downstream, and foreach will be executed with v being the current aggregation result.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thanks very much, It works for us! But as you mentioned, the httpclient.execute will be executed each time the KTable is updated, this is not what I want. I want to aggregate the windowed data, for example,I want to get the sum of all the numbers of the 5 minutes window, for example, [4, 5, 6], I only want the final result(sum) 15, But now it will output 4, 9, and 15, calculate the sum each time a new data is coming. – NingLee Aug 23 '18 at 12:40
  • Can I do this using Kafka streams? – NingLee Aug 23 '18 at 12:41
  • 1
    Currently there is no "final" result. Read the blog post: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ -- Also compare: https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable -- upcoming 2.1 release will include what you want. Compare: https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables – Matthias J. Sax Aug 23 '18 at 18:20
  • I finally use the KStream.transform() to filter the non-final result, and use transform() again to convert the restful result to a KStream and then save result to Kafka topic. It works, but maybe it is not the best practice I think. – NingLee Aug 24 '18 at 10:06
  • For now, it is best practice. In upcoming 2.1, we plan to add `Suppress` configuration so you don't have to write custom code. – Matthias J. Sax Aug 24 '18 at 17:03