8

The purpose I want to achieve is to group by user some messages I receive from a Kafka topic and window them in order to aggregate the messages I receive in the (5 minutes) window. Then I'd like to collect all aggregates in each window in order to process them at once adding them to a report of all the messages I received in the 5 minutes interval.

The last point seems to be the tough part as Kafka Streams doesn't seem to provide (at least I can't find it!) anything that can collect all the window related stuff in a "finite" stream to be processed in one place.

This is the code I implemented

StreamsBuilder builder = new StreamsBuilder();
KStream<UserId, Message> messages = builder.stream("KAFKA_TOPIC");

TimeWindowedKStream<UserId, Message> windowedMessages =
        messages.
                groupByKey().windowedBy(TimeWindows.of(SIZE_MS));

KTable<Windowed<UserId>, List<Message>> messagesAggregatedByWindow =
        windowedMessages.
                aggregate(
                        () -> new LinkedList<>(), new MyAggregator<>(),
                        Materialized.with(new MessageKeySerde(), new MessageListSerde())
                );

messagesAggregatedByWindow.toStream().foreach((key, value) -> log.info("({}), KEY {} MESSAGE {}",  value.size(), key, value.toString()));

KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();

The result is something like

KEY [UserId(82770583)@1531502760000/1531502770000] Message [Message(userId=UserId(82770583),message="a"),Message(userId=UserId(82770583),message="b"),Message(userId=UserId(82770583),message="d")]
KEY [UserId(77082590)@1531502760000/1531502770000] Message [Message(userId=UserId(77082590),message="g")]
KEY [UserId(85077691)@1531502750000/1531502760000] Message [Message(userId=UserId(85077691),message="h")]
KEY [UserId(79117307)@1531502780000/1531502790000] Message [Message(userId=UserId(79117307),message="e")]
KEY [UserId(73176289)@1531502760000/1531502770000] Message [Message(userId=UserId(73176289),message="r"),Message(userId=UserId(73176289),message="q")]
KEY [UserId(92077080)@1531502760000/1531502770000] Message [Message(userId=UserId(92077080),message="w")]
KEY [UserId(78530050)@1531502760000/1531502770000] Message [Message(userId=UserId(78530050),message="t")]
KEY [UserId(64640536)@1531502760000/1531502770000] Message [Message(userId=UserId(64640536),message="y")]

For each window there are many log lines and they are mixed with the other windows.

What I'd like to have is something like:

// Hypothetical implementation
windowedMessages.streamWindows((interval, window) -> process(interval, window));

where method process would be something like:

// Hypothetical implementation

void process(Interval interval, WindowStream<UserId, List<Message>> windowStream) {
// Create report for the whole window   
Report report = new Report(nameFromInterval());
    // Loop on the finite iterable that represents the window content
    for (WindowStreamEntry<UserId, List<Message>> entry: windowStream) {
        report.addLine(entry.getKey(), entry.getValue());
    }
    report.close();
}

The result would be grouped like this (each report is a call to my callback: void process(...)) and the commit of each window would be committed when the whole window is processed:

Report 1:
    KEY [UserId(85077691)@1531502750000/1531502760000] Message [Message(userId=UserId(85077691),message="h")]

Report 2:
    KEY [UserId(82770583)@1531502760000/1531502770000] Message [Message(userId=UserId(82770583),message="a"),Message(userId=UserId(82770583),message="b"),Message(userId=UserId(82770583),message="d")]
    KEY [UserId(77082590)@1531502760000/1531502770000] Message [Message(userId=UserId(77082590),message="g")]
    KEY [UserId(73176289)@1531502760000/1531502770000] Message [Message(userId=UserId(73176289),message="r"),Message(userId=UserId(73176289),message="q")]
    KEY [UserId(92077080)@1531502760000/1531502770000] Message [Message(userId=UserId(92077080),message="w")]
    KEY [UserId(78530050)@1531502760000/1531502770000] Message [Message(userId=UserId(78530050),message="t")]
    KEY [UserId(64640536)@1531502760000/1531502770000] Message [Message(userId=UserId(64640536),message="y")]

Report 3
    KEY [UserId(79117307)@1531502780000/1531502790000] Message [Message(userId=UserId(79117307),message="e")]
simo
  • 373
  • 4
  • 11
  • If you want to get the "raw records" you can implement a windowed aggregation that returns a `List` as result type and apply the actual computation later. Instead of using the DSL, using the Processor API might also be an option. – Matthias J. Sax Jul 20 '18 at 21:09
  • Thank you Matthias. I tried to do exactly this, but unfortunately the windows are huge, so it probably not safe to get them as a list. It would be better to get some iterator that iterates on the "list" of objects in the window loading them in chunks (treating the window like a finite topic) – simo Jul 23 '18 at 07:17
  • I see. This won't work out-of-the-box. Maybe you need to implement a custom window operator using Processor API. – Matthias J. Sax Jul 23 '18 at 17:15

1 Answers1

5

I had the same doubt. I've talked with the developers of the library and they said that this is a really common request yet not implemented. It will be released soon.

You can find more information here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables

Bruno
  • 136
  • 8
  • Thank you @realBigfoot.I'm not sure it fits my problem I read: - allow to attach callbacks in kafka streams, to be triggered when a window is expired https://issues.apache.org/jira/browse/KAFKA-6556 - For our new Suppress operator to support window final results, we need to define a point at which window results are actually final! This sounds close to what I need, but I don't see examples of windows processed "at once". I'd like a stream of windows that calls out callback passing an object (an iterable?) representing the result of the aggregation in the whole window – simo Jul 20 '18 at 15:16
  • I've just updated the post adding a description of how I want the data to be grouped. – simo Jul 20 '18 at 15:23
  • Well as spoken with one the developer. I quote here what he said to me: "In that case, one workaround would be to query the state store directly after you know that no more updates would be applied to that store in a `punctuation` function: note that punctuation is a feature that's only available in the Processor API, but you can always add such a lower-level implementation into your DSL topology by calling `KStream#process() / transform()`." – Bruno Jul 24 '18 at 11:02
  • @Bruno seems like the improvement you are looking for is available. I just used it for achieving sth similar (.suppress(Suppressed.untilWindowCloses(unbounded()))). – Vassilis Nov 23 '19 at 10:37