10

I want to batch messages with KStream interface.

I have a Stream with Keys/values I tried to collect them in a tumbling window and then I wanted to process the complete window at once.

builder.stream(longSerde, updateEventSerde, CONSUME_TOPIC)
                .aggregateByKey(
                        HashMap::new,
                        (aggKey, value, aggregate) -> {
                            aggregate.put(value.getUuid, value);
                            return aggregate;
                        },
                        TimeWindows.of("intentWindow", 100),
                        longSerde, mapSerde)
                .foreach((wk, values) -> {

The thing is foreach gets called on each update to the KTable. I would like to process the whole window once it is complete. As in collect Data from 100 ms and then process at once. In for each.

16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 294
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 295
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 296
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 297
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 298
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 299
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 1
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 2
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 3
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 4
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 5
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 6

at some point the new window starts with 1 entry in the map. So I don't even know when the window is full.

any hints to to batch process in kafka streams

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
samst
  • 536
  • 7
  • 19
  • 1
    Possible duplicate of [How to send final kafka-streams aggregation result of a time windowed KTable?](http://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable) – Matthias J. Sax Aug 24 '16 at 19:08

3 Answers3

5

My actual tasks is to push updates from the stream to redis but I don't want to read / update / write individiually even though redis is fast. My solution for now is to use KStream.process() supply a processor which adds to a queue on process and actually process the queue in punctuate.

public class BatchedProcessor extends AbstractProcessor{

...
BatchedProcessor(Writer writer, long schedulePeriodic)

@Override
public void init(ProcessorContext context) {
    super.init(context);
    context.schedule(schedulePeriodic);
}

@Override
public void punctuate(long timestamp) {
    super.punctuate(timestamp);
    writer.processQueue();
    context().commit();
}

@Override
public void process(Long aLong, IntentUpdateEvent intentUpdateEvent) {
    writer.addToQueue(intentUpdateEvent);
}

I still have to test but it solves the problem I had. One could easily write such a processor in a very generic way. The API is very neat and clean but a processBatched((List batchedMessaages)-> ..., timeInterval OR countInterval) that just uses punctuate to process the batch and commits at that point and collects KeyValues in a Store might be a useful addition.

But maybe it was intended to solve this with a Processor and keep the API purely in the one message at a time low latency focus.

samst
  • 536
  • 7
  • 19
  • How do you tackle later arriving (i.e., out-of-order) records? – Matthias J. Sax Dec 02 '16 at 17:19
  • Some use cases may not care about late arriving data (IoT sensor data for example). So it would be a great if one can tune the stream in that way. – Hendrik Jander Dec 23 '16 at 00:17
  • What happens if you add a load of messages using process, the consumer offset is committed and then the application dies before punctuate is called? Aren't those messages lost forever? – bm1729 Mar 17 '17 at 11:32
  • I think @bm1729 is right here. It seems like you would be exposing yourself to miss data. – Scalahansolo Feb 12 '18 at 20:24
4

Right now (as of Kafka 0.10.0.0 / 0.10.0.1): The windowing behavior you are describing is "working as expected". That is, if you are getting 1,000 incoming messages, you will (currently) always see 1,000 updates going downstream with the latest versions of Kafka / Kafka Streams.

Looking ahead: The Kafka community is working on new features to make this update-rate behavior more flexible (e.g. to allow for what you described above as your desired behavior). See KIP-63: Unify store and downstream caching in streams for more details.

miguno
  • 14,498
  • 3
  • 47
  • 63
3

====== Update ======

On further testing, this does not work. The correct approach is to use a processor as outlined by @friedrich-nietzsche. I am down-voting my own answer.... grrrr.

===================

I am still wrestling with this API (but I love it, so it's time well spent :)), and I am not sure what you're trying to accomplish downstream from where your code sample ended, but it looks similar to what I got working. High level is:

Object read from source. It represents a key and 1:∞ number of events, and I want to publish the total number of events per key every 5 seconds ( or TP5s, transactions per 5 seconds ). The beginning of the code looks the same, but I use:

  1. KStreamBuilder.stream
  2. reduceByKey
  3. to a window(5000)
  4. to a new stream which gets the accumulated value for each key every 5 secs.
  5. map that stream to a new KeyValue per key
  6. to the sink topic.

In my case, each window period, I can reduce all events to one event per key, so this works. If you want to retain all the individual events per window, I assume that could use reduce to map each instance to a collection of instances (possibly with the same key, or you might need a new key) and at the end of each window period, the downstream stream will get a bunch of collections of your events (or maybe just one collection of all the events), all in one go. It looks like this, sanitized and Java 7-ish:

    builder.stream(STRING_SERDE, EVENT_SERDE, SOURCE_TOPICS)
        .reduceByKey(eventReducer, TimeWindows.of("EventMeterAccumulator", 5000), STRING_SERDE, EVENT_SERDE)            
        .toStream()
        .map(new KeyValueMapper<Windowed<String>, Event, KeyValue<String,Event>>() {
            public KeyValue<String, Event> apply(final Windowed<String> key, final Event finalEvent) {
                return new KeyValue<String, Event>(key.key(), new Event(key.window().end(), finalEvent.getCount());
            }
    }).to(STRING_SERDE, EVENT_SERDE, SINK_TOPIC);
Community
  • 1
  • 1
Nicholas
  • 15,916
  • 4
  • 42
  • 66
  • So this will output 1 value / key every 5 seconds? Im still not 100% on step 4. From what Im seeing you would get a stream of changes there. Or is that what you mean? – ethrbunny Aug 23 '16 at 20:17
  • I get one key/value per key every 5 seconds. The value represents the total count of events in that time period (each source event can represent more than one "sub-event"). But if you created an aggregation object like a single map like Map>, you could deliver a single object downstream. – Nicholas Aug 23 '16 at 20:23
  • 1
    doesn't the toStream() mess everything up again. reduceByKey gets you a KTable but without any cache or Buffer turning it into a KStream again will just yield you an update event for each well update to a key. I think I may have to detect my batch outside when I process the window as soon as windowKey changes I now the last tumbling window is done and I my batch is full. I don't what it may do to garbage collection as I am not sure if the aggregate is always the same object or there are deep copys. – samst Aug 24 '16 at 07:47
  • Fair question. An actual code sample needed. Gist coming anon. – Nicholas Aug 24 '16 at 11:07
  • Updated answer as a non-answer. It does not work. The Processor approach is the way to go. – Nicholas Aug 25 '16 at 09:55